Skip to content

Commit

Permalink
Added new UI Functionality
Browse files Browse the repository at this point in the history
new page for xmd
new page to view logs
  • Loading branch information
datasetutil committed Mar 4, 2015
1 parent 8e6da0e commit ecc202c
Show file tree
Hide file tree
Showing 17 changed files with 1,223 additions and 74 deletions.
148 changes: 134 additions & 14 deletions src/main/java/com/sforce/dataset/flow/monitor/DataFlowMonitorUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
Expand Down Expand Up @@ -77,7 +79,7 @@ public static void getJobsAndErrorFiles(PartnerConnection partnerConnection, Str
public static List<JobEntry> getDataFlowJobs(PartnerConnection partnerConnection, String datasetName) throws ConnectionException, URISyntaxException, ClientProtocolException, IOException
{
List<JobEntry> jobsList = new LinkedList<JobEntry>();
System.out.println();
// System.out.println();
partnerConnection.getServerTimestamp();
ConnectorConfig config = partnerConnection.getConfig();
String sessionID = config.getSessionId();
Expand All @@ -97,7 +99,7 @@ public static List<JobEntry> getDataFlowJobs(PartnerConnection partnerConnection

listEMPost.setConfig(requestConfig);
listEMPost.addHeader("Authorization","OAuth "+sessionID);
System.out.println("Fetching job list from server, this may take a minute...");
// System.out.println("Fetching job list from server, this may take a minute...");
CloseableHttpResponse emresponse = httpClient.execute(listEMPost);
String reasonPhrase = emresponse.getStatusLine().getReasonPhrase();
int statusCode = emresponse.getStatusLine().getStatusCode();
Expand Down Expand Up @@ -187,21 +189,134 @@ public static List<JobEntry> getDataFlowJobs(PartnerConnection partnerConnection
throw new IOException(String.format("Dataflow job list download failed, invalid server response %s",emList));
}
}
System.out.println("Found {"+jobsList.size()+"} jobs for dataset {"+datasetName+"}");
// System.out.println("Found {"+jobsList.size()+"} jobs for dataset {"+datasetName+"}");
return jobsList;
}

public static boolean getJobErrorFile(PartnerConnection partnerConnection, String datasetName, String jobTrackerid) throws ConnectionException, URISyntaxException, ClientProtocolException, IOException
@SuppressWarnings({ "rawtypes", "unchecked" })
public static List<NodeEntry> getDataFlowJobNodes(PartnerConnection partnerConnection, String nodeUrl) throws ConnectionException, URISyntaxException, ClientProtocolException, IOException
{
List<NodeEntry> nodeList = new LinkedList<NodeEntry>();
// System.out.println();
partnerConnection.getServerTimestamp();
ConnectorConfig config = partnerConnection.getConfig();
String sessionID = config.getSessionId();
String serviceEndPoint = config.getServiceEndpoint();
CloseableHttpClient httpClient = HttpClients.createDefault();

RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(60000)
.setConnectTimeout(60000)
.setConnectionRequestTimeout(60000)
.build();

URI u = new URI(serviceEndPoint);

URI listEMURI = new URI(u.getScheme(),u.getUserInfo(), u.getHost(), u.getPort(), nodeUrl, null,null);
HttpGet listEMPost = new HttpGet(listEMURI);

listEMPost.setConfig(requestConfig);
listEMPost.addHeader("Authorization","OAuth "+sessionID);
// System.out.println("Fetching node list from server, this may take a minute...");
CloseableHttpResponse emresponse = httpClient.execute(listEMPost);
String reasonPhrase = emresponse.getStatusLine().getReasonPhrase();
int statusCode = emresponse.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(String.format("getDataFlowJobs failed: %d %s", statusCode,reasonPhrase));
}
HttpEntity emresponseEntity = emresponse.getEntity();
InputStream emis = emresponseEntity.getContent();
String emList = IOUtils.toString(emis, "UTF-8");
emis.close();
httpClient.close();

if(emList!=null && !emList.isEmpty())
{
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Map res = mapper.readValue(emList, Map.class);
// mapper.writerWithDefaultPrettyPrinter().writeValue(System.out, res);
List<Map> jobs = (List<Map>) res.get("result");
if(jobs != null && !jobs.isEmpty())
{
for(Map job:jobs)
{
String _type = (String) job.get("_type");
if(_type != null && _type.equals("nodes"))
{
NodeEntry nodeEntry = new NodeEntry();


nodeEntry.startTime = (String) job.get("startTime");

nodeEntry._uid = (String) job.get("_uid");
nodeEntry.duration = (String) job.get("duration");

nodeEntry.nodeName = (String) job.get("nodeName");

nodeEntry.nodeType = (String) job.get("nodeType");

nodeEntry._type = _type;

nodeEntry.status = (String) job.get("status");

Object temp = job.get("outputRowsFailed");
if(temp != null)
{
if(temp instanceof Number)
{
nodeEntry.outputRowsFailed = ((Number)temp).longValue();
}else
{
BigDecimal bd = new BigDecimal(temp.toString());
nodeEntry.outputRowsFailed = bd.longValue();
}
}

temp = job.get("outputRowsProcessed");
if(temp != null)
{
if(temp != null && temp instanceof Number)
{
nodeEntry.outputRowsProcessed = ((Number)temp).longValue();
}else
{
BigDecimal bd = new BigDecimal(temp.toString());
nodeEntry.outputRowsProcessed = bd.longValue();
}
}
nodeList.add(nodeEntry);
}
}
}else
{
throw new IOException(String.format("Dataflow job list download failed, invalid server response %s",emList));
}
}else
{
throw new IOException(String.format("Dataflow job list download failed, invalid server response %s",emList));
}
// System.out.println("Found {"+nodeList.size()+"} nodes for dataset {"+nodeUrl+"}");
return nodeList;
}

public static File getJobErrorFile(PartnerConnection partnerConnection, String datasetName, String jobTrackerid) throws ConnectionException, URISyntaxException, ClientProtocolException, IOException
{
if(jobTrackerid == null || jobTrackerid.trim().isEmpty())
{
System.out.println("Job TrackerId cannot be null");
return false;
throw new IOException("Job TrackerId cannot be null");
}

System.out.println();

if(datasetName == null || datasetName.trim().isEmpty())
{
throw new IOException("datasetName cannot be null");
}

// System.out.println();
partnerConnection.getServerTimestamp();
ConnectorConfig config = partnerConnection.getConfig();
ConnectorConfig config = partnerConnection.getConfig();
String orgId = partnerConnection.getUserInfo().getOrganizationId();

String sessionID = config.getSessionId();
String serviceEndPoint = config.getServiceEndpoint();
CloseableHttpClient httpClient = HttpClients.createDefault();
Expand All @@ -219,24 +334,29 @@ public static boolean getJobErrorFile(PartnerConnection partnerConnection, Strin

listEMPost.setConfig(requestConfig);
listEMPost.addHeader("Authorization","OAuth "+sessionID);
System.out.println("Fetching error sessionLog for job {"+jobTrackerid+"} from server...");
// System.out.println("Fetching error sessionLog for job {"+jobTrackerid+"} from server...");
CloseableHttpResponse emresponse = httpClient.execute(listEMPost);
String reasonPhrase = emresponse.getStatusLine().getReasonPhrase();
int statusCode = emresponse.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(String.format("getDataFlowJob node error sessionLog failed: %d %s", statusCode,reasonPhrase));
}
HttpEntity emresponseEntity = emresponse.getEntity();
InputStream emis = emresponseEntity.getContent();
File outfile = new File(datasetName+"_"+jobTrackerid+"_error.csv");
InputStream emis = emresponseEntity.getContent();

File dataDir = DatasetUtilConstants.getDataDir(orgId);
File parent = new File(dataDir,datasetName);
FileUtils.forceMkdir(parent);

File outfile = new File(parent, datasetName+"_"+jobTrackerid+"_error.csv");
System.out.println("fetching file {"+outfile+"}. Content-length {"+emresponseEntity.getContentLength()+"}");
BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(outfile),DatasetUtilConstants.DEFAULT_BUFFER_SIZE);
IOUtils.copy(emis, out);
out.close();
emis.close();
httpClient.close();
System.out.println("file {"+outfile+"} downloaded. Size{"+outfile.length()+"}\n");
return true;
// System.out.println("file {"+outfile+"} downloaded. Size{"+outfile.length()+"}\n");
return outfile;

// if(emList!=null && !emList.isEmpty())
// {
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/com/sforce/dataset/flow/monitor/JobEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,44 @@ public class JobEntry {
long _createdDateTime = 0;
String workflowName = null;
String nodeUrl = null;

public String getErrorMessage() {
return errorMessage;
}
public long getStartTimeEpoch() {
return startTimeEpoch;
}
public int getStatus() {
return status;
}
public long getEndTimeEpoch() {
return endTimeEpoch;
}
public String get_uid() {
return _uid;
}
public String getType() {
return type;
}
public String getEndTime() {
return endTime;
}
public String getStartTime() {
return startTime;
}
public String get_type() {
return _type;
}
public long getDuration() {
return duration;
}
public long get_createdDateTime() {
return _createdDateTime;
}
public String getWorkflowName() {
return workflowName;
}
public String getNodeUrl() {
return nodeUrl;
}
}
67 changes: 67 additions & 0 deletions src/main/java/com/sforce/dataset/flow/monitor/NodeEntry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2014, salesforce.com, inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided
* that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions and the
* following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and
* the following disclaimer in the documentation and/or other materials provided with the distribution.
*
* Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package com.sforce.dataset.flow.monitor;

public class NodeEntry {
String startTime = null;// : Tue Mar 03 00:41:36 GMT 2015
String _type = null;// : nodes
String duration = null;// : 0 hours, 0 minutes
String status = null;// : success
long outputRowsFailed = 0;// : 5
long outputRowsProcessed = 0; // : 17934
String nodeName = null; // : digest
String _uid = null; // : 08JB00000000In9MAE
String nodeType = null;//: csvDigest

public String getStartTime() {
return startTime;
}
public String get_type() {
return _type;
}
public String getDuration() {
return duration;
}
public String getStatus() {
return status;
}
public long getOutputRowsFailed() {
return outputRowsFailed;
}
public long getOutputRowsProcessed() {
return outputRowsProcessed;
}
public String getNodeName() {
return nodeName;
}
public String get_uid() {
return _uid;
}
public String getNodeType() {
return nodeType;
}

}
Loading

0 comments on commit ecc202c

Please sign in to comment.