Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphSONInputForrmat and GraphSONOutputFormat now support modes, #169

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<email>[email protected]</email>
<url>http://matthiasb.com</url>
</contributor>
<contributor>
<name>Robin Syihab</name>
<email>[email protected]</email>
<url>http://robin.nosql.asia</url>
</contributor>
</contributors>
<inceptionYear>2012</inceptionYear>
<licenses>
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/com/thinkaurelius/faunus/FaunusElement.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,17 @@ public void readFields(final DataInput in) throws IOException {
public void write(final DataOutput out) throws IOException {
WritableUtils.writeVLong(out, this.id);
out.writeBoolean(this.pathEnabled);
if (this.pathEnabled)
ElementPaths.write(this.paths, out);
else
WritableUtils.writeVLong(out, this.pathCounter);
ElementProperties.write(this.properties, out);

try {
if (this.pathEnabled)
ElementPaths.write(this.paths, out);
else
WritableUtils.writeVLong(out, this.pathCounter);
ElementProperties.write(this.properties, out);
} catch (com.esotericsoftware.kryo.KryoException e) {
throw new com.esotericsoftware.kryo.KryoException("Kryo failed when processing " + this.toString() +
". " + e.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.io.graphson.ElementFactory;
import com.tinkerpop.blueprints.util.io.graphson.ElementPropertyConfig;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONTokens;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
import com.tinkerpop.blueprints.util.io.graphson.*;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
Expand All @@ -20,11 +16,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.*;

import static com.tinkerpop.blueprints.Direction.IN;
import static com.tinkerpop.blueprints.Direction.OUT;
Expand All @@ -44,7 +36,11 @@ public class FaunusGraphSONUtility {

private static final FaunusElementFactory elementFactory = new FaunusElementFactory();

private static final GraphSONUtility graphson = new GraphSONUtility(GraphSONMode.COMPACT, elementFactory,
private static final GraphSONUtility graphsonCompact = new GraphSONUtility(GraphSONMode.COMPACT, elementFactory,
ElementPropertyConfig.ExcludeProperties(VERTEX_IGNORE, EDGE_IGNORE));
private static final GraphSONUtility graphsonNormal = new GraphSONUtility(GraphSONMode.NORMAL, elementFactory,
ElementPropertyConfig.ExcludeProperties(VERTEX_IGNORE, EDGE_IGNORE));
private static final GraphSONUtility graphsonExtended = new GraphSONUtility(GraphSONMode.EXTENDED, elementFactory,
ElementPropertyConfig.ExcludeProperties(VERTEX_IGNORE, EDGE_IGNORE));

public static List<FaunusVertex> fromJSON(final InputStream in) throws IOException {
Expand All @@ -60,32 +56,60 @@ public static List<FaunusVertex> fromJSON(final InputStream in) throws IOExcepti
}

public static FaunusVertex fromJSON(String line) throws IOException {
return fromJSON(line, GraphSONMode.COMPACT);
}

private static GraphSONUtility getGraphSON(GraphSONMode mode){
switch(mode){
case EXTENDED:
return graphsonExtended;
case NORMAL:
return graphsonNormal;
default:
return graphsonCompact;
}

}

public static FaunusVertex fromJSON(String line, GraphSONMode mode) throws IOException {
try {
final JSONObject json = new JSONObject(new JSONTokener(line));
line = EMPTY_STRING; // clear up some memory

final FaunusVertex vertex = (FaunusVertex) graphson.vertexFromJson(json);
JSONArray outEArray = json.optJSONArray(_OUT_E);
JSONArray inEArray = json.optJSONArray(_IN_E);

fromJSONEdges(vertex, json.optJSONArray(_OUT_E), OUT);
json.remove(_OUT_E); // clear up some memory
fromJSONEdges(vertex, json.optJSONArray(_IN_E), IN);
json.remove(_IN_E); // clear up some memory

FaunusVertex vertex = (FaunusVertex) getGraphSON(mode).vertexFromJson(json);

fromJSONEdges(vertex, outEArray, OUT, mode);
fromJSONEdges(vertex, inEArray, IN, mode);


return vertex;
} catch (NullPointerException e){
return null;
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
}

private static void fromJSONEdges(final FaunusVertex vertex, final JSONArray edges, final Direction direction) throws JSONException, IOException {
fromJSONEdges(vertex, edges, direction, GraphSONMode.COMPACT);
}

private static void fromJSONEdges(final FaunusVertex vertex, final JSONArray edges, final Direction direction, GraphSONMode mode) throws JSONException, IOException {
if (null != edges) {
for (int i = 0; i < edges.length(); i++) {
final JSONObject edge = edges.optJSONObject(i);
FaunusEdge faunusEdge = null;

if (direction.equals(Direction.IN)) {
faunusEdge = (FaunusEdge) graphson.edgeFromJson(edge, new FaunusVertex(edge.optLong(GraphSONTokens._OUT_V)), vertex);
faunusEdge = (FaunusEdge) getGraphSON(mode).edgeFromJson(edge, new FaunusVertex(edge.optLong(GraphSONTokens._OUT_V)), vertex);
} else if (direction.equals(Direction.OUT)) {
faunusEdge = (FaunusEdge) graphson.edgeFromJson(edge, vertex, new FaunusVertex(edge.optLong(GraphSONTokens._IN_V)));
faunusEdge = (FaunusEdge) getGraphSON(mode).edgeFromJson(edge, vertex, new FaunusVertex(edge.optLong(GraphSONTokens._IN_V)));
}

if (faunusEdge != null) {
Expand All @@ -96,8 +120,12 @@ private static void fromJSONEdges(final FaunusVertex vertex, final JSONArray edg
}

public static JSONObject toJSON(final Vertex vertex) throws IOException {
return toJSON(vertex, GraphSONMode.COMPACT);
}

public static JSONObject toJSON(final Vertex vertex, GraphSONMode mode) throws IOException {
try {
final JSONObject object = GraphSONUtility.jsonFromElement(vertex, getElementPropertyKeys(vertex, false), GraphSONMode.COMPACT);
final JSONObject object = GraphSONUtility.jsonFromElement(vertex, getElementPropertyKeys(vertex, false), mode);

// force the ID to long. with blueprints, most implementations will send back a long, but
// some like TinkerGraph will return a string. the same is done for edges below
Expand All @@ -107,7 +135,7 @@ public static JSONObject toJSON(final Vertex vertex) throws IOException {
if (!edges.isEmpty()) {
final JSONArray outEdgesArray = new JSONArray();
for (final Edge outEdge : edges) {
final JSONObject edgeObject = GraphSONUtility.jsonFromElement(outEdge, getElementPropertyKeys(outEdge, true), GraphSONMode.COMPACT);
final JSONObject edgeObject = GraphSONUtility.jsonFromElement(outEdge, getElementPropertyKeys(outEdge, true), mode);
outEdgesArray.put(edgeObject);
}
object.put(_OUT_E, outEdgesArray);
Expand All @@ -117,7 +145,7 @@ public static JSONObject toJSON(final Vertex vertex) throws IOException {
if (!edges.isEmpty()) {
final JSONArray inEdgesArray = new JSONArray();
for (final Edge inEdge : edges) {
final JSONObject edgeObject = GraphSONUtility.jsonFromElement(inEdge, getElementPropertyKeys(inEdge, false), GraphSONMode.COMPACT);
final JSONObject edgeObject = GraphSONUtility.jsonFromElement(inEdge, getElementPropertyKeys(inEdge, false), mode);
inEdgesArray.put(edgeObject);
}
object.put(_IN_E, inEdgesArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.thinkaurelius.faunus.FaunusVertex;
import com.thinkaurelius.faunus.formats.VertexQueryFilter;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -25,7 +26,20 @@ public class GraphSONInputFormat extends FileInputFormat<NullWritable, FaunusVer

@Override
public RecordReader<NullWritable, FaunusVertex> createRecordReader(final InputSplit split, final TaskAttemptContext context) {
return new GraphSONRecordReader(this.vertexQuery);

final GraphSONMode mode;

String modeStr = context.getConfiguration().getRaw("faunus.graphson.mode");

if (modeStr.equals("normal")){
mode = GraphSONMode.NORMAL;
}else if (modeStr.equals("extended")){
mode = GraphSONMode.EXTENDED;
}else{
mode = GraphSONMode.COMPACT;
}

return new GraphSONRecordReader(this.vertexQuery, mode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import com.thinkaurelius.faunus.FaunusVertex;
import com.thinkaurelius.faunus.formats.FaunusFileOutputFormat;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.DataOutputStream;
import java.io.IOException;

/**
Expand All @@ -15,6 +17,29 @@ public class GraphSONOutputFormat extends FaunusFileOutputFormat {

@Override
public RecordWriter<NullWritable, FaunusVertex> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
return new GraphSONRecordWriter(super.getDataOuputStream(job));
DataOutputStream os = super.getDataOuputStream(job);
final GraphSONMode mode;

String modeStr = job.getConfiguration().getRaw("faunus.graphson.mode");

if (modeStr == "normal"){
mode = GraphSONMode.NORMAL;
}else if (modeStr == "extended"){
mode = GraphSONMode.EXTENDED;
}else{
mode = GraphSONMode.COMPACT;
}

return new GraphSONRecordWriter(os) {

@Override
public void write(NullWritable key, FaunusVertex vertex) throws IOException {
if (null != vertex) {
this.out.write(FaunusGraphSONUtility.toJSON(vertex, mode).toString().getBytes("UTF-8"));
this.out.write(NEWLINE);
}
}
};
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.thinkaurelius.faunus.FaunusVertex;
import com.thinkaurelius.faunus.formats.VertexQueryFilter;
import com.thinkaurelius.faunus.mapreduce.FaunusCompiler;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
Expand All @@ -21,10 +22,16 @@ public class GraphSONRecordReader extends RecordReader<NullWritable, FaunusVerte
private final LineRecordReader lineRecordReader;
private final VertexQueryFilter vertexQuery;
private FaunusVertex vertex = null;
private GraphSONMode mode;

public GraphSONRecordReader(VertexQueryFilter vertexQuery) {
this(vertexQuery, GraphSONMode.COMPACT);
}

public GraphSONRecordReader(VertexQueryFilter vertexQuery, GraphSONMode mode) {
this.lineRecordReader = new LineRecordReader();
this.vertexQuery = vertexQuery;
this.mode = mode;
}

@Override
Expand All @@ -38,9 +45,15 @@ public boolean nextKeyValue() throws IOException {
if (!this.lineRecordReader.nextKeyValue())
return false;

this.vertex = FaunusGraphSONUtility.fromJSON(this.lineRecordReader.getCurrentValue().toString());
this.vertex = FaunusGraphSONUtility.fromJSON(this.lineRecordReader.getCurrentValue().toString(), mode);

if (this.vertex == null)
return false;


this.vertexQuery.defaultFilter(this.vertex);
this.vertex.enablePath(this.pathEnabled);

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class GraphSONRecordWriter extends RecordWriter<NullWritable, FaunusVertex> {
private static final String UTF8 = "UTF-8";
private static final byte[] NEWLINE;
protected static final String UTF8 = "UTF-8";
protected static final byte[] NEWLINE;
protected DataOutputStream out;

static {
Expand Down