Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faunus Accumulo Data Source/Sink #145

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,21 @@
<artifactId>titan-hbase</artifactId>
<version>${titan.version}</version>
</dependency>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-accumulo-core</artifactId>
<version>${titan.version}</version>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-es</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.thinkaurelius.faunus.formats.titan.accumulo;

import com.google.common.base.Preconditions;
import com.thinkaurelius.faunus.FaunusVertex;
import com.thinkaurelius.faunus.formats.titan.FaunusTitanGraph;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StaticBufferEntry;
import com.thinkaurelius.titan.diskstorage.util.StaticByteBuffer;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;

/**
* @author Etienne Deprit <[email protected]>
*/
public class FaunusTitanAccumuloGraph extends FaunusTitanGraph {

public FaunusTitanAccumuloGraph(final String configFile) throws ConfigurationException {
this(new PropertiesConfiguration(configFile));
}

public FaunusTitanAccumuloGraph(final Configuration configuration) {
super(configuration);
}

public FaunusVertex readFaunusVertex(byte[] key, final Iterator<Map.Entry<Key, Value>> columnIterator) {
return super.readFaunusVertex(ByteBuffer.wrap(key), new AccumuloMapIterable(columnIterator));
}

private static class AccumuloMapIterable implements Iterable<Entry> {

private final Iterator<Map.Entry<Key, Value>> columnIterator;

public AccumuloMapIterable(final Iterator<Map.Entry<Key, Value>> columnIterator) {
Preconditions.checkNotNull(columnIterator);
this.columnIterator = columnIterator;
}

@Override
public Iterator<Entry> iterator() {
return new AccumuloMapIterator(columnIterator);
}
}

private static class AccumuloMapIterator implements Iterator<Entry> {

private final Iterator<Map.Entry<Key, Value>> iterator;

public AccumuloMapIterator(final Iterator<Map.Entry<Key, Value>> iterator) {
this.iterator = iterator;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Entry next() {
final Map.Entry<Key, Value> entry = iterator.next();
return new StaticBufferEntry(new StaticByteBuffer(entry.getKey().getColumnQualifier().getBytes()),
new StaticByteBuffer(entry.getValue().get()));
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.thinkaurelius.faunus.formats.titan.accumulo;

import com.thinkaurelius.faunus.FaunusVertex;
import com.thinkaurelius.faunus.formats.VertexQueryFilter;
import com.thinkaurelius.faunus.formats.titan.GraphFactory;
import com.thinkaurelius.faunus.formats.titan.TitanInputFormat;
import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME;
import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT;
import com.thinkaurelius.faunus.mapreduce.FaunusCompiler;
import com.thinkaurelius.titan.diskstorage.Backend;
import com.thinkaurelius.titan.diskstorage.accumulo.AccumuloKeyColumnValueStore;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;

/**
* @author Etienne Deprit <[email protected]>
*/
public class TitanAccumuloInputFormat extends TitanInputFormat {

// Accumulo store manager configuration
public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME =
"faunus.graph.input.titan.storage.tablename";
public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE =
"faunus.graph.input.titan.storage.accumulo-config.instance";
public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME =
"faunus.graph.input.titan.storage.accumulo-config.username";
public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD =
"faunus.graph.input.titan.storage.accumulo-config.password";
public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS =
"faunus.graph.input.titan.storage.accumulo-config.server-side-iterators";
// Instance variables
private final AccumuloRowInputFormat accumuloRowInputFormat = new AccumuloRowInputFormat();
private Configuration configuration;
private FaunusTitanAccumuloGraph graph;
private VertexQueryFilter vertexQuery;
private boolean pathEnabled;

@Override
public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException {
return accumuloRowInputFormat.getSplits(jobContext);
}

@Override
public RecordReader<NullWritable, FaunusVertex> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new TitanAccumuloRecordReader(graph, vertexQuery, pathEnabled, accumuloRowInputFormat.createRecordReader(inputSplit, taskAttemptContext));
}

@Override
public void setConf(final Configuration config) {
configuration = config;
graph = new FaunusTitanAccumuloGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN));
vertexQuery = VertexQueryFilter.create(config);
pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false);

String instance = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE);
String zookeepers = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME);

if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) {
zookeepers = StringUtils.join(zookeepers.split(","),
":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT) + ",");
}

try {
AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers);
} catch (IllegalStateException ex) {
// zookeeper instance reset in map task
}

String username = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME);
String password = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD);

String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME);
boolean serverSideIterators = config.getBoolean(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS, false);

try {
AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations());
} catch (IllegalStateException ex) {
// input info reset in map task
}

config.set("storage.read-only", "true");
config.set("autotype", "none");

Collection<Pair<Text, Text>> columnFamilyColumnQualifierPairs =
Collections.singletonList(new Pair<Text, Text>(new Text(Backend.EDGESTORE_NAME), null));

AccumuloRowInputFormat.fetchColumns(config, columnFamilyColumnQualifierPairs);

SliceQuery sliceQuery = TitanInputFormat.inputSlice(vertexQuery, graph);
IteratorSetting is = AccumuloKeyColumnValueStore.getColumnSliceIterator(sliceQuery);

if (is != null) {
AccumuloRowInputFormat.addIterator(config, is);
if (!serverSideIterators) {
AccumuloRowInputFormat.setLocalIterators(config, true);
}
}
}

@Override
public Configuration getConf() {
return configuration;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.thinkaurelius.faunus.formats.titan.accumulo;

import com.thinkaurelius.faunus.formats.titan.TitanOutputFormat;

/**
* @author Etienne Deprit <[email protected]>
*/
public class TitanAccumuloOutputFormat extends TitanOutputFormat {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.thinkaurelius.faunus.formats.titan.accumulo;

import com.thinkaurelius.faunus.FaunusVertex;
import com.thinkaurelius.faunus.formats.VertexQueryFilter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.Map;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;

/**
* @author Etienne Deprit <[email protected]>
*/
public class TitanAccumuloRecordReader extends RecordReader<NullWritable, FaunusVertex> {

private RecordReader<Text, PeekingIterator<Map.Entry<Key, Value>>> reader;
private FaunusTitanAccumuloGraph graph;
private VertexQueryFilter vertexQuery;
private boolean pathEnabled;
private FaunusVertex vertex;

public TitanAccumuloRecordReader(final FaunusTitanAccumuloGraph graph,
final VertexQueryFilter vertexQuery, final boolean pathEnabled,
final RecordReader<Text, PeekingIterator<Map.Entry<Key, Value>>> reader) {
this.graph = graph;
this.vertexQuery = vertexQuery;
this.pathEnabled = pathEnabled;
this.reader = reader;
}

@Override
public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
reader.initialize(inputSplit, taskAttemptContext);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (reader.nextKeyValue()) {
final FaunusVertex temp = graph.readFaunusVertex(reader.getCurrentKey().getBytes(), reader.getCurrentValue());
if (null != temp) {
if (this.pathEnabled) {
temp.enablePath(true);
}
vertex = temp;
vertexQuery.defaultFilter(this.vertex);
return true;
}
}
return false;
}

@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}

@Override
public FaunusVertex getCurrentValue() throws IOException, InterruptedException {
return vertex;
}

@Override
public void close() throws IOException {
graph.shutdown();
reader.close();
}

@Override
public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class Imports {
imports.add("com.thinkaurelius.faunus.formats.titan.*");
imports.add("com.thinkaurelius.faunus.formats.titan.hbase.*");
imports.add("com.thinkaurelius.faunus.formats.titan.cassandra.*");
imports.add("com.thinkaurelius.faunus.formats.titan.accumulo.*");
imports.add("com.thinkaurelius.faunus.hdfs.*");
imports.add("com.thinkaurelius.faunus.tinkerpop.gremlin.*");
imports.add("com.tinkerpop.gremlin.Tokens.T");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.thinkaurelius.faunus.formats.titan.accumulo;

import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports;
import junit.framework.TestCase;

/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public class TitanAccumuloInputFormatTest extends TestCase {

public void testInGremlinImports() {
assertTrue(Imports.getImports().contains(TitanAccumuloInputFormat.class.getPackage().getName() + ".*"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.thinkaurelius.faunus.formats.titan.accumulo;

import com.thinkaurelius.faunus.formats.titan.hbase.*;
import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports;
import junit.framework.TestCase;

/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public class TitanAccumuloOutputFormatTest extends TestCase {

public void testInGremlinImports() {
assertTrue(Imports.getImports().contains(TitanAccumuloOutputFormat.class.getPackage().getName() + ".*"));
}
}