diff --git a/pom.xml b/pom.xml index 1a25bbc..4f484d1 100755 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ 1.2.1 1.2.17 1.7R4 - 1.6.0 + 2.2.0 2.3 2.4 @@ -81,13 +81,11 @@ - diff --git a/src/main/java/org/getopt/luke/plugins/RemoteFSDirectory.java b/src/main/java/org/getopt/luke/plugins/RemoteFSDirectory.java new file mode 100644 index 0000000..dc2ffa8 --- /dev/null +++ b/src/main/java/org/getopt/luke/plugins/RemoteFSDirectory.java @@ -0,0 +1,199 @@ +package org.getopt.luke.plugins; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.fs.Path; +import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RemoteFSDirectory extends Directory { + private static int bufferSize; + + private String getFilePath(String name) { + return directory + "/" + name; // TODO + } + + public RemoteFSDirectory(RemoteFileReader fileReader, String directory) { + this.fileReader = fileReader; + this.directory = directory; + } + + private static final Logger LOG = LoggerFactory.getLogger(RemoteFSDirectory.class); + + private RemoteFileReader fileReader; + private String directory; + + @Override + public String[] listAll() throws IOException { + return fileReader.listAllFiles(directory); + } + + @Override + public void deleteFile(String name) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long fileLength(String name) throws IOException { + return fileReader.fileLength(getFilePath(name)); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IndexOutput createTempOutput(String s, String s1, IOContext ioContext) throws IOException { + return null; + } + + @Override + public void sync(Collection names) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void rename(String s, String s1) throws IOException { + + } + + @Override + public void syncMetaData() throws IOException { + + } + + @Override + public IndexInput openInput(String name, IOContext context) + throws IOException { + RemoteFile file = new RemoteFile(fileReader, getFilePath(name)); + return new RemoteFSIndexInput(name, file, context); + } + + @Override + public Lock obtainLock(String s) throws IOException { + return null; + } + + public Lock makeLock(final String name) { + return new Lock() { + public boolean obtain() { + return true; + } + public void release() { + } + public boolean isLocked() { + return false; + } + public String toString() { + return "Lock@" + new Path(directory, name); + } + + public void close() throws IOException { + } + + @Override + public void ensureValid() throws IOException { + } + + }; + } + + @Override + public void close() throws IOException { + // TODO: close rmi connection? + } + + static final class RemoteFSIndexInput extends BufferedIndexInput { + protected final RemoteFile file; + /** is this instance a clone and hence does not own the file to close it */ + boolean isClone = false; + /** start offset: non-zero in the slice case */ + protected final long off; + /** end offset (start+length) */ + protected final long end; + + public RemoteFSIndexInput(String resourceDesc, RemoteFile file, IOContext context) throws IOException { + super(resourceDesc, context); + this.file = file; + this.off = 0L; + this.end = file.length(); + setBufferSize(bufferSize); + } + + public RemoteFSIndexInput(String resourceDesc, RemoteFile file, long off, long length, int bufferSize) { + super(resourceDesc, bufferSize); + this.file = file; + this.off = off; + this.end = off + length; + this.isClone = true; + } + + @Override + protected void readInternal(byte[] b, int offset, int len) + throws IOException { + long position = off + getFilePointer(); + if (position + len > end) { + throw new EOFException("read past EOF: " + this); + } + System.out.println(String.format("Reading %d bytes from %s position=%d", len, file.path, position)); + byte[] buf; + if (file.isInitialized()) { + buf = file.read(position, len); + System.arraycopy(buf, 0, b, offset, len); + } else { + buf = file.read(position, len); + System.arraycopy(buf, RemoteFileReaderImpl.INTEGER_SIZE, b, offset, len); + } + } + + @Override + protected void seekInternal(long pos) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void close() throws IOException { + if (!isClone) { + file.close(); + } + } + + @Override + public RemoteFSIndexInput clone() { + RemoteFSIndexInput clone = (RemoteFSIndexInput) super.clone(); + clone.isClone = true; + return clone; + } + + //@Override + // TODO: remove for 4.0.0 and lower? + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if (offset < 0 || length < 0 || offset + length > this.length()) { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this); + } + return new RemoteFSIndexInput(sliceDescription, file, off + offset, length, getBufferSize()); + } + + @Override + public long length() { + return end - off; + } + + } + + public static final void setBufferSize(int newSize) { + bufferSize = newSize; + } + +} \ No newline at end of file diff --git a/src/main/java/org/getopt/luke/plugins/RemoteFSTool.java b/src/main/java/org/getopt/luke/plugins/RemoteFSTool.java new file mode 100644 index 0000000..69e5daf --- /dev/null +++ b/src/main/java/org/getopt/luke/plugins/RemoteFSTool.java @@ -0,0 +1,76 @@ +package org.getopt.luke.plugins; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.Arrays; + +public class RemoteFSTool { + static final int CHUNK_SIZE = 8192; + + static RemoteFileReader createRemoteFileReader(String host, String port) throws MalformedURLException, RemoteException, NotBoundException { + String uri = String.format("rmi://%s:%s/remotefilereader", host, port); + return (RemoteFileReader) Naming.lookup(uri); + } + + private static void copyFile(RemoteFileReader fileReader, String path) throws IOException { + long len = fileReader.fileLength(path); + int total = 0; + + FileOutputStream fos = new FileOutputStream((new File(path)).getName()); + + byte[] bytes = fileReader.read(path, total, (int) Math.min(CHUNK_SIZE, len-total)); + int fileId = bytes[3] & 0xFF | + (bytes[2] & 0xFF) << 8 | + (bytes[1] & 0xFF) << 16 | + (bytes[0] & 0xFF) << 24; + int l = bytes.length - RemoteFileReaderImpl.INTEGER_SIZE; + fos.write(bytes, RemoteFileReaderImpl.INTEGER_SIZE, l); + total += l; + + while (total < len) { + bytes = fileReader.read(fileId, total, (int) Math.min(CHUNK_SIZE, len-total)); + fos.write(bytes); + total += bytes.length; + } + fos.close(); + assert total == len; + fileReader.close(fileId); + } + + public static void main(String[] args) throws NotBoundException, IOException { + if (args.length != 3) { + throw new IllegalArgumentException("Usage: java RemoteFSTool "); + } + String cmd = args[0]; + String host = args[1]; + String path = args[2]; + + RemoteFileReader fileReader = createRemoteFileReader(host, "1415"); + + switch (cmd) { + case "list": + System.out.println(Arrays.asList(fileReader.listAllFiles(path))); + break; + case "exist": + System.out.println(fileReader.fileExist(path)); + break; + case "length": + System.out.println(fileReader.fileLength(path)); + break; + case "copy": + copyFile(fileReader, path); + break; + case "close": + fileReader.close(Integer.parseInt(path)); + break; + default: + throw new IllegalArgumentException("Unknown command " + cmd); + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/getopt/luke/plugins/RemoteFile.java b/src/main/java/org/getopt/luke/plugins/RemoteFile.java new file mode 100644 index 0000000..deb4255 --- /dev/null +++ b/src/main/java/org/getopt/luke/plugins/RemoteFile.java @@ -0,0 +1,86 @@ +package org.getopt.luke.plugins; + +import java.rmi.RemoteException; + +/** + * Represent a random-access file on remote host + * @author minh.nguyen + * + */ +public class RemoteFile { + RemoteFileReader fileReader; + String path; + + private int fileId = -1; + private boolean initialized = false; + + public RemoteFile(RemoteFileReader fileReader, String path) { + this.fileReader = fileReader; + this.path = path; + } + + /** + * + * @return + *
    + *
  • true if the remote file has been initialized. This means the method read has been called before.
  • + *
  • false if the remote file has not been initialized. This means the method read has never been called before.
  • + *
+ */ + public boolean isInitialized() { + return initialized; + } + + /** + * Close the remote file + * @throws RemoteException + */ + public void close() throws RemoteException { + if (initialized) { + fileReader.close(fileId); + } + } + + /** + * Read len bytes from the remote file starting at position + * Before invoking this method, client needs to call isInitialized to check the + * state of the remote file. + *
    + *
  • + * If isInitialized return false, this would be the + * first time the method read is invoked. The first 4 bytes + * of the return buffer is the header containing the file ID of the remote file. Client + * should skip this header before reading raw data. It is decided to put this inconvenience + * on the client for efficiency reason. + *
  • + *
  • + * If isInitialized return true, the entire return buffer contains the + * raw data. + *
  • + *
      + * + */ + public byte[] read(long position, int len) throws RemoteException { + if (initialized) { + return fileReader.read(fileId, position, len); + } else { + byte[] buf = fileReader.read(path, position, len); + fileId = buf[3] & 0xFF | + (buf[2] & 0xFF) << 8 | + (buf[1] & 0xFF) << 16 | + (buf[0] & 0xFF) << 24; + initialized = true; + // Return original buffer without stripping fileId header for efficiency reason + return buf; + } + } + + /** + * + * @return the size of the remote file in bytes + * @throws RemoteException + */ + public long length() throws RemoteException { + return fileReader.fileLength(path); + } +} \ No newline at end of file diff --git a/src/main/java/org/getopt/luke/plugins/RemoteFileReader.java b/src/main/java/org/getopt/luke/plugins/RemoteFileReader.java new file mode 100644 index 0000000..62c13e3 --- /dev/null +++ b/src/main/java/org/getopt/luke/plugins/RemoteFileReader.java @@ -0,0 +1,61 @@ +package org.getopt.luke.plugins; + +import java.rmi.Remote; +import java.rmi.RemoteException; + +public interface RemoteFileReader extends Remote { + + /** + * Close the remote file + * @param fileId + * @throws RemoteException + */ + void close(int fileId) throws RemoteException; + + /** + * Read len bytes from the remote file specified by the full path starting at position. + * The first 4 bytes of the return buffer is the header containing the file ID of the remote file. + * Client should skip this header before reading raw data. The file ID should be used in subsequent + * reads to get better performance by reusing cached file handler + * + */ + byte[] read(String path, long position, int len) throws RemoteException; + + /** + * Read len bytes from the remote file specified by the fileId starting + * at position. + * The entire return buffer contains the raw data. The fileId can be obtained + * from the other overloaded version of read + * + * @param fileId + * @param position + * @param len + * @return + * @throws RemoteException + */ + byte[] read(int fileId, long position, int len) throws RemoteException; + + /** + * + * @param path directory on remote host + * @return list of all files in the given path + * @throws RemoteException + */ + String[] listAllFiles(String path) throws RemoteException; + + /** + * + * @param path file on remote host + * @return true if remote file exists, false otherwise + * @throws RemoteException + */ + boolean fileExist(String path) throws RemoteException; + + /** + * + * @return the size of the remote file in bytes + * @throws RemoteException + */ + long fileLength(String path) throws RemoteException; + +} \ No newline at end of file diff --git a/src/main/java/org/getopt/luke/plugins/RemoteFileReaderImpl.java b/src/main/java/org/getopt/luke/plugins/RemoteFileReaderImpl.java new file mode 100644 index 0000000..4fdc237 --- /dev/null +++ b/src/main/java/org/getopt/luke/plugins/RemoteFileReaderImpl.java @@ -0,0 +1,234 @@ +package org.getopt.luke.plugins; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RemoteFileReaderImpl + extends UnicastRemoteObject + implements RemoteFileReader { + static final int INTEGER_SIZE = 4; + + private static final long serialVersionUID = 1L; + + private static int globalFileId = 1; + + private static final int CHUNK_SIZE = 8192; + + private static final int rmiPort = 1416; + + private Map openFiles = new HashMap<>(); + + // Synchronization for multiple readers and one writer. Protected resource here is the openFiles table + // Readers are RMI threads serving index data streaming. It's safe for multiple RMI threads to access the + // openFiles table since they never share the same file handler. + // Writer is the Prune timer to prune expired open files + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final Lock r = rwl.readLock(); + private final Lock w = rwl.writeLock(); + + RemoteFileReaderImpl() throws RemoteException { + super(rmiPort); + + System.out.println(Thread.currentThread().getName() + ": instantiating RemoteFileReaderImpl on rmi port=" + rmiPort); + } + + private synchronized int getNextFileId() { + return globalFileId++; + } + + @Override + public void close(int fileId) throws RemoteException { + try { + r.lock(); + RandomAccessFileWrapper fileWrapper = openFiles.get(fileId); + if (fileWrapper != null) { + System.out.println(String.format("%s: Closing file=%s", Thread.currentThread().getName(), fileWrapper.toString())); + fileWrapper.file.close(); + openFiles.remove(fileId); + } else { + System.out.println("close fileId=" + fileId + " does not exist!"); + } + } catch (IOException e) { + throw new RemoteException(e.getMessage()); + } finally { + r.unlock(); + } + } + + public void pruneExpiredOpenFiles() { + try { + System.out.println(Thread.currentThread().getName() + ": Checking for pruning"); + w.lock(); + Set fileIds = openFiles.keySet(); + for (int fileId : fileIds) { + RandomAccessFileWrapper fileWrapper = openFiles.get(fileId); + System.out.println(String.format( + "%s: Checking for pruning file=%s", Thread.currentThread().getName(), fileWrapper.toString())); + if (fileWrapper.expire()) { + System.out.println(String.format( + "%s: Closing expired file=%s", Thread.currentThread().getName(), fileWrapper.toString())); + openFiles.remove(fileId); + } + } + } finally { + w.unlock(); + } + } + + private void _read(RandomAccessFileWrapper fileWrapper, long fileOffset, int len, byte[] buf, int bufOffset) throws IOException { + + System.out.println(String.format("%s: Streaming %d bytes offset=%d file=%s", Thread.currentThread().getName(), len, fileOffset, fileWrapper.toString())); + + int total = 0; + fileWrapper.resetTimeStamp(); + RandomAccessFile file = fileWrapper.file; + file.seek(fileOffset); + + while (total < len) { + int toRead = Math.min(CHUNK_SIZE, len - total); + int i = file.read(buf, bufOffset + total, toRead); + total += i; + } + assert total == len; + } + + @Override + public byte[] read(String path, long offset, int len) throws RemoteException { + try { + r.lock(); + RandomAccessFile file; + file = new RandomAccessFile(path, "r"); + int fileId = getNextFileId(); + RandomAccessFileWrapper fileWrapper = new RandomAccessFileWrapper(fileId, file, path); + openFiles.put(fileId, fileWrapper); + + byte[] bytes = new byte[INTEGER_SIZE + len]; + bytes[0] = (byte) ((fileId >> 24) & 0xFF); + bytes[1] = (byte) ((fileId >> 16) & 0xFF); + bytes[2] = (byte) ((fileId >> 8) & 0xFF); + bytes[3] = (byte) (fileId & 0xFF); + _read(fileWrapper, offset, len, bytes, INTEGER_SIZE); + + return bytes; + } catch (IOException e) { + throw new RemoteException(e.getMessage()); + } finally { + r.unlock(); + } + + } + + @Override + public byte[] read(int fileId, long offset, int len) throws RemoteException { + try { + r.lock(); + RandomAccessFileWrapper fileWrapper = openFiles.get(fileId); + if (fileWrapper == null) { + throw new RemoteException("read fileId=" + fileId + " does not exist!"); + } + + byte[] bytes = new byte[len]; + _read(fileWrapper, offset, len, bytes, 0); + + return bytes; + } catch (IOException e) { + throw new RemoteException(e.getMessage()); + } finally { + r.unlock(); + } + } + + @Override + public String[] listAllFiles(String path) throws RemoteException { + File dir = new File(path); + return dir.list(new FilenameFilter() { + @Override + public boolean accept(File dir1, String file) { + return !new File(dir1, file).isDirectory(); + } + }); + } + + @Override + public boolean fileExist(String path) throws RemoteException { + File file = new File(path); + return file.exists(); + } + + @Override + public long fileLength(String path) throws RemoteException { + try { + File file = new File(path); + final long len = file.length(); + if (len == 0 && !file.exists()) { + throw new FileNotFoundException(path); + } else { + return len; + } + } catch (FileNotFoundException e) { + throw new RemoteException(e.getMessage()); + } + } + + /** + * Wrapper for RandomAccessFile. The main purpose is to keep track of file idle time. + * File should be closed if its idle time exceeds MAX_IDLE_TIME + * @author minh.nguyen + * + */ + static class RandomAccessFileWrapper { + static final int MAX_IDLE_TIME = 24*3600; + int id; + RandomAccessFile file; + String path; + long timeStamp; + + RandomAccessFileWrapper( + int id, + RandomAccessFile file, + String path) { + this.id = id; + this.file = file; + this.path = path; + this.timeStamp = System.currentTimeMillis(); + } + + @Override + public String toString() { + return "[id=" + id + ", path=" + path + + ", idle=" + idleTime() + " s]"; + } + + /** + * Client should call this method each time the file is accessed + */ + void resetTimeStamp() { + this.timeStamp = System.currentTimeMillis(); + } + + boolean expire() { + return idleTime() > MAX_IDLE_TIME; + } + + /** + * Client should call resetTimeStamp each time the file is accessed in order for this method + * to return the correct idle time elapse (in seconds) + * @return time elapse in seconds since the last time the file was accessed + */ + long idleTime() { + return (System.currentTimeMillis() - timeStamp) / 1000; + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/getopt/luke/plugins/RemoteFileReaderServer.java b/src/main/java/org/getopt/luke/plugins/RemoteFileReaderServer.java new file mode 100644 index 0000000..105b738 --- /dev/null +++ b/src/main/java/org/getopt/luke/plugins/RemoteFileReaderServer.java @@ -0,0 +1,36 @@ +package org.getopt.luke.plugins; + +import java.net.MalformedURLException; +import java.rmi.Naming; +import java.rmi.RemoteException; + +public class RemoteFileReaderServer { + + private static final int PRUNE_PERIOD = 3600000; + + public static void main(String[] args) { + try { + final RemoteFileReaderImpl fileReader = new RemoteFileReaderImpl(); + Naming.rebind("remotefilereader", fileReader); + System.out.println("RemoteFileReader Server ready."); + + // Periodic thread to prune expired open files + // TODO: to be enabled based on configuration + /*Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + fileReader.pruneExpiredOpenFiles(); + } + }, 0, PRUNE_PERIOD);*/ + + } catch (RemoteException e) { + System.out.println("Exception in RemoteFileReaderImpl.main: " + e); + } catch (MalformedURLException e) { + System.out.println("MalformedURLException " + e); + } + + } + + +} \ No newline at end of file diff --git a/src/main/java/org/getopt/luke/plugins/RemoteIndexPlugin.java b/src/main/java/org/getopt/luke/plugins/RemoteIndexPlugin.java new file mode 100644 index 0000000..f25e497 --- /dev/null +++ b/src/main/java/org/getopt/luke/plugins/RemoteIndexPlugin.java @@ -0,0 +1,119 @@ +package org.getopt.luke.plugins; + +import org.apache.hadoop.util.StringUtils; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.getopt.luke.LukePlugin; +import org.getopt.luke.SlowThread; + +/** + * Source: https://github.com/md6nguyen/luke-remoteindex/blob/master/RemoteIndexPlugin.java + */ +public class RemoteIndexPlugin extends LukePlugin { + + private static final String DEFAULT_INDEX_URI = "host:path"; + private String lastMsg = "?"; + private Object btOpen; + private Object status; + private Object indexUri; + private Object total; + private Object bufSize; + private IndexReader myIr = null; + private int bufferSize = 4096; + private boolean opening = false; + + @Override + public String getPluginHome() { + return "mailto:minh.nguyen@jivesoftware.com"; + } + + @Override + public String getPluginInfo() { + return "Open indexes located on remote filesystem."; + } + + @Override + public String getPluginName() { + return "RemoteIndex Plugin"; + } + + @Override + public String getXULName() { + return "/xml/remoteindex.xml"; + } + + @Override + public boolean init() throws Exception { + status = app.find(myUi, "status"); + app.setString(status, "text", lastMsg); + btOpen = app.find(myUi, "btOpen"); + indexUri = app.find(myUi, "indexUri"); + total = app.find(myUi, "totalBytes"); + bufSize = app.find(myUi, "bufSize"); + if (ir != myIr) { + // reset ui + lastMsg = "?"; + app.setString(total, "text", ""); + } + app.setString(status, "text", lastMsg); + + String indexUriVal = app.getString(indexUri, "text"); + String indexUriEnv = System.getenv("indexUri"); + if ((isEmptyString(indexUriVal) || DEFAULT_INDEX_URI.equals(indexUriVal)) && !isEmptyString(indexUriEnv)) { + app.setString(indexUri, "text", indexUriEnv); + } + + return false; + } + + private static boolean isEmptyString(String str) { + return str == null || "".equals(str); + } + + public void actionOpen() { + final String uriTxt = app.getString(indexUri, "text"); + if (uriTxt.trim().length() == 0) { + lastMsg = "Empty index path."; + app.errorMsg(lastMsg); + return; + } + try { + bufferSize = Integer.parseInt(app.getString(bufSize, "text")); + } catch (Exception e) { + // + } + SlowThread st = new SlowThread(app) { + public void execute() { + openIndex(uriTxt); + } + }; + st.start(); + } + + private void openIndex(String uriTxt) { + opening = true; + myIr = null; + try { + String[] tokens = uriTxt.trim().split(":"); + String host = tokens[0]; + String path = tokens[1]; + IndexReader r = null; + app.setString(status, "text", "Opening remote index"); + RemoteFileReader fileReader = RemoteFSTool.createRemoteFileReader(host, "1415"); + RemoteFSDirectory fsdir = new RemoteFSDirectory(fileReader, path); + RemoteFSDirectory.setBufferSize(bufferSize); + r = DirectoryReader.open(fsdir); + + myIr = r; + app.setSlowAccess(true); + app.setIndexReader(r, uriTxt); + app.setString(status, "text", "Remote Index Opened Successfully"); + app.showStatus("Remote Index Opened Successfully"); + } catch (Exception e) { + app.errorMsg("Error: " + StringUtils.stringifyException(e)); + } finally { + opening = false; + } + } + +} \ No newline at end of file diff --git a/src/main/resources/xml/remoteindex.xml b/src/main/resources/xml/remoteindex.xml new file mode 100644 index 0000000..8995d44 --- /dev/null +++ b/src/main/resources/xml/remoteindex.xml @@ -0,0 +1,21 @@ + + +