Skip to content
This repository has been archived by the owner on Sep 21, 2021. It is now read-only.

Commit

Permalink
#68 integrated plugin, to be debugged
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryKey committed Feb 10, 2017
1 parent d8ef00c commit 635121d
Show file tree
Hide file tree
Showing 9 changed files with 833 additions and 3 deletions.
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<hadoop.version>1.2.1</hadoop.version>
<log4j.version>1.2.17</log4j.version>
<rhino.version>1.7R4</rhino.version>
<elasticsearch.version>1.6.0</elasticsearch.version>
<elasticsearch.version>2.2.0</elasticsearch.version>

<maven-shade-plugin.version>2.3</maven-shade-plugin.version>
<maven-assembly-plugin.version>2.4</maven-assembly-plugin.version>
Expand Down Expand Up @@ -81,13 +81,11 @@

<!-- Elasticsearch -->

<!-- TODO: uncomment following dependencies at future Elasticsearch release (supporting Lucene 5x)
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
-->

<!-- SOLR -->

Expand Down
199 changes: 199 additions & 0 deletions src/main/java/org/getopt/luke/plugins/RemoteFSDirectory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package org.getopt.luke.plugins;

import java.io.EOFException;
import java.io.IOException;
import java.util.Collection;

import org.apache.hadoop.fs.Path;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteFSDirectory extends Directory {
private static int bufferSize;

private String getFilePath(String name) {
return directory + "/" + name; // TODO
}

public RemoteFSDirectory(RemoteFileReader fileReader, String directory) {
this.fileReader = fileReader;
this.directory = directory;
}

private static final Logger LOG = LoggerFactory.getLogger(RemoteFSDirectory.class);

private RemoteFileReader fileReader;
private String directory;

@Override
public String[] listAll() throws IOException {
return fileReader.listAllFiles(directory);
}

@Override
public void deleteFile(String name) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public long fileLength(String name) throws IOException {
return fileReader.fileLength(getFilePath(name));
}

@Override
public IndexOutput createOutput(String name, IOContext context)
throws IOException {
throw new UnsupportedOperationException();
}

@Override
public IndexOutput createTempOutput(String s, String s1, IOContext ioContext) throws IOException {
return null;
}

@Override
public void sync(Collection<String> names) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void rename(String s, String s1) throws IOException {

}

@Override
public void syncMetaData() throws IOException {

}

@Override
public IndexInput openInput(String name, IOContext context)
throws IOException {
RemoteFile file = new RemoteFile(fileReader, getFilePath(name));
return new RemoteFSIndexInput(name, file, context);
}

@Override
public Lock obtainLock(String s) throws IOException {
return null;
}

public Lock makeLock(final String name) {
return new Lock() {
public boolean obtain() {
return true;
}
public void release() {
}
public boolean isLocked() {
return false;
}
public String toString() {
return "Lock@" + new Path(directory, name);
}

public void close() throws IOException {
}

@Override
public void ensureValid() throws IOException {
}

};
}

@Override
public void close() throws IOException {
// TODO: close rmi connection?
}

static final class RemoteFSIndexInput extends BufferedIndexInput {
protected final RemoteFile file;
/** is this instance a clone and hence does not own the file to close it */
boolean isClone = false;
/** start offset: non-zero in the slice case */
protected final long off;
/** end offset (start+length) */
protected final long end;

public RemoteFSIndexInput(String resourceDesc, RemoteFile file, IOContext context) throws IOException {
super(resourceDesc, context);
this.file = file;
this.off = 0L;
this.end = file.length();
setBufferSize(bufferSize);
}

public RemoteFSIndexInput(String resourceDesc, RemoteFile file, long off, long length, int bufferSize) {
super(resourceDesc, bufferSize);
this.file = file;
this.off = off;
this.end = off + length;
this.isClone = true;
}

@Override
protected void readInternal(byte[] b, int offset, int len)
throws IOException {
long position = off + getFilePointer();
if (position + len > end) {
throw new EOFException("read past EOF: " + this);
}
System.out.println(String.format("Reading %d bytes from %s position=%d", len, file.path, position));
byte[] buf;
if (file.isInitialized()) {
buf = file.read(position, len);
System.arraycopy(buf, 0, b, offset, len);
} else {
buf = file.read(position, len);
System.arraycopy(buf, RemoteFileReaderImpl.INTEGER_SIZE, b, offset, len);
}
}

@Override
protected void seekInternal(long pos) throws IOException {
// TODO Auto-generated method stub

}

@Override
public void close() throws IOException {
if (!isClone) {
file.close();
}
}

@Override
public RemoteFSIndexInput clone() {
RemoteFSIndexInput clone = (RemoteFSIndexInput) super.clone();
clone.isClone = true;
return clone;
}

//@Override
// TODO: remove for 4.0.0 and lower?
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
if (offset < 0 || length < 0 || offset + length > this.length()) {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this);
}
return new RemoteFSIndexInput(sliceDescription, file, off + offset, length, getBufferSize());
}

@Override
public long length() {
return end - off;
}

}

public static final void setBufferSize(int newSize) {
bufferSize = newSize;
}

}
76 changes: 76 additions & 0 deletions src/main/java/org/getopt/luke/plugins/RemoteFSTool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.getopt.luke.plugins;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.util.Arrays;

public class RemoteFSTool {
static final int CHUNK_SIZE = 8192;

static RemoteFileReader createRemoteFileReader(String host, String port) throws MalformedURLException, RemoteException, NotBoundException {
String uri = String.format("rmi://%s:%s/remotefilereader", host, port);
return (RemoteFileReader) Naming.lookup(uri);
}

private static void copyFile(RemoteFileReader fileReader, String path) throws IOException {
long len = fileReader.fileLength(path);
int total = 0;

FileOutputStream fos = new FileOutputStream((new File(path)).getName());

byte[] bytes = fileReader.read(path, total, (int) Math.min(CHUNK_SIZE, len-total));
int fileId = bytes[3] & 0xFF |
(bytes[2] & 0xFF) << 8 |
(bytes[1] & 0xFF) << 16 |
(bytes[0] & 0xFF) << 24;
int l = bytes.length - RemoteFileReaderImpl.INTEGER_SIZE;
fos.write(bytes, RemoteFileReaderImpl.INTEGER_SIZE, l);
total += l;

while (total < len) {
bytes = fileReader.read(fileId, total, (int) Math.min(CHUNK_SIZE, len-total));
fos.write(bytes);
total += bytes.length;
}
fos.close();
assert total == len;
fileReader.close(fileId);
}

public static void main(String[] args) throws NotBoundException, IOException {
if (args.length != 3) {
throw new IllegalArgumentException("Usage: java RemoteFSTool <list|exist|length|copy|close> <host> <filepath>");
}
String cmd = args[0];
String host = args[1];
String path = args[2];

RemoteFileReader fileReader = createRemoteFileReader(host, "1415");

switch (cmd) {
case "list":
System.out.println(Arrays.asList(fileReader.listAllFiles(path)));
break;
case "exist":
System.out.println(fileReader.fileExist(path));
break;
case "length":
System.out.println(fileReader.fileLength(path));
break;
case "copy":
copyFile(fileReader, path);
break;
case "close":
fileReader.close(Integer.parseInt(path));
break;
default:
throw new IllegalArgumentException("Unknown command " + cmd);
}
}

}
86 changes: 86 additions & 0 deletions src/main/java/org/getopt/luke/plugins/RemoteFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.getopt.luke.plugins;

import java.rmi.RemoteException;

/**
* Represent a random-access file on remote host
* @author minh.nguyen
*
*/
public class RemoteFile {
RemoteFileReader fileReader;
String path;

private int fileId = -1;
private boolean initialized = false;

public RemoteFile(RemoteFileReader fileReader, String path) {
this.fileReader = fileReader;
this.path = path;
}

/**
*
* @return
* <ul>
* <li><code>true</code> if the remote file has been initialized. This means the method <code>read</code> has been called before.</li>
* <li><code>false</code> if the remote file has not been initialized. This means the method <code>read</code> has never been called before.</li>
* </ul>
*/
public boolean isInitialized() {
return initialized;
}

/**
* Close the remote file
* @throws RemoteException
*/
public void close() throws RemoteException {
if (initialized) {
fileReader.close(fileId);
}
}

/**
* Read <code>len</code> bytes from the remote file starting at <code>position</code>
* Before invoking this method, client needs to call <code>isInitialized</code> to check the
* state of the remote file.
* <ul>
* <li>
* If <code>isInitialized</code> return <code>false</code>, this would be the
* first time the method <code>read</code> is invoked. The first 4 bytes
* of the return buffer is the header containing the file ID of the remote file. Client
* should skip this header before reading raw data. It is decided to put this inconvenience
* on the client for efficiency reason.
* </li>
* <li>
* If <code>isInitialized</code> return <code>true</code>, the entire return buffer contains the
* raw data.
* </li>
* <ul>
*
*/
public byte[] read(long position, int len) throws RemoteException {
if (initialized) {
return fileReader.read(fileId, position, len);
} else {
byte[] buf = fileReader.read(path, position, len);
fileId = buf[3] & 0xFF |
(buf[2] & 0xFF) << 8 |
(buf[1] & 0xFF) << 16 |
(buf[0] & 0xFF) << 24;
initialized = true;
// Return original buffer without stripping fileId header for efficiency reason
return buf;
}
}

/**
*
* @return the size of the remote file in bytes
* @throws RemoteException
*/
public long length() throws RemoteException {
return fileReader.fileLength(path);
}
}
Loading

0 comments on commit 635121d

Please sign in to comment.