diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/CompressionScheme.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/CompressionScheme.java index 92f78779e3f..049d2a2afd3 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/CompressionScheme.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/CompressionScheme.java @@ -183,7 +183,7 @@ public static CompressionScheme getScheme(CompressedMatrixBlock cmb) { } public CompressedMatrixBlock updateAndEncode(MatrixBlock mb, int k) { - if(k == 1) + if(k == 1 || mb.getInMemorySize() < 1000 * 20 * 8) return updateAndEncode(mb); final ExecutorService pool = CommonThreadPool.get(k); diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index e98fe443a01..744a5e945a5 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -65,6 +65,8 @@ public final class WriterCompressed extends MatrixWriter { protected static JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); private String fname; + + private FileSystem fs; private Future[] writers; private Lock[] writerLocks; @@ -137,10 +139,14 @@ private void write(MatrixBlock src, final String fname, final int blen) throws I jobUse = 0; } + if(this.fname != fname) { this.fname = fname; this.writers = null; } + + fs = IOUtilFunctions.getFileSystem(new Path(fname), job); + final int k = OptimizerUtils.getParallelBinaryWriteParallelism(); final int rlen = src.getNumRows(); final int clen = src.getNumColumns(); @@ -158,18 +164,20 @@ else if(!(src instanceof CompressedMatrixBlock)) } private void writeSingleBlock(MatrixBlock b, int k) throws IOException { - Writer w = getWriter(fname); + final Path path = new Path(fname); + Writer w = generateWriter(job, path, fs); MatrixIndexes idx = new MatrixIndexes(1, 1); if(!(b instanceof CompressedMatrixBlock)) b = CompressedMatrixBlockFactory.compress(b, k).getLeft(); w.append(idx, new CompressedWriteBlock(b)); IOUtilFunctions.closeSilently(w); - cleanup(); + cleanup(path); } private void writeMultiBlockUncompressed(MatrixBlock b, final int rlen, final int clen, final int blen, int k) throws IOException { - Writer w = getWriter(fname); + final Path path = new Path(fname); + Writer w = generateWriter(job, path, fs); final MatrixIndexes indexes = new MatrixIndexes(); LOG.warn("Writing compressed format with non identical compression scheme"); @@ -187,7 +195,7 @@ private void writeMultiBlockUncompressed(MatrixBlock b, final int rlen, final in } } IOUtilFunctions.closeSilently(w); - cleanup(); + cleanup(path); } private void writeMultiBlockCompressed(MatrixBlock b, final int rlen, final int clen, final int blen, int k) @@ -205,7 +213,8 @@ private void writeMultiBlockCompressedSingleThread(MatrixBlock mb, final int rle try { setupWrite(); - Writer w = getWriter(fname); + final Path path = new Path(fname); + Writer w = generateWriter(job, path, fs); for(int bc = 0; bc * blen < clen; bc++) {// column blocks final int sC = bc * blen; final int mC = Math.min(sC + blen, clen) - 1; @@ -221,6 +230,7 @@ private void writeMultiBlockCompressedSingleThread(MatrixBlock mb, final int rle } IOUtilFunctions.closeSilently(w); + cleanup(path); } catch(Exception e) { throw new IOException(e); @@ -244,7 +254,7 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi final int j = i; if(writers[i] == null) { writers[i] = pool.submit(() -> { - return generateWriter(job, getPath(j)); + return generateWriter(job, getPath(j), fs); }); } writerLocks[i] = new ReentrantLock(); @@ -282,7 +292,7 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi pool.submit(() -> { try { IOUtilFunctions.closeSilently(writers[l].get()); - cleanup(job, getPath(l)); + cleanup(job, getPath(l), fs); } catch(Exception e) { throw new RuntimeException(e); @@ -310,12 +320,12 @@ private Path getPath(int id) { return new Path(fname, IOUtilFunctions.getPartFileName(id)); } - private Writer getWriter(String fname) throws IOException { - final Path path = new Path(fname); - return generateWriter(job, path); - } + // private Writer getWriter(String fname) throws IOException { + // final Path path = new Path(fname); + // return generateWriter(job, path); + // } - private static Writer generateWriter(JobConf job, Path path) throws IOException { + private static Writer generateWriter(JobConf job, Path path, FileSystem fs) throws IOException { return SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096), Writer.keyClass(MatrixIndexes.class), Writer.valueClass(CompressedWriteBlock.class), @@ -323,13 +333,11 @@ private static Writer generateWriter(JobConf job, Path path) throws IOException Writer.replication((short) 1)); } - private void cleanup() throws IOException { - final Path path = new Path(fname); - cleanup(job, path); + private void cleanup(Path p) throws IOException { + cleanup(job, p, fs); } - private static void cleanup(JobConf job, Path path) throws IOException { - final FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + private static void cleanup(JobConf job, Path path, FileSystem fs) throws IOException { IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } @@ -400,7 +408,7 @@ public Object call() throws Exception { Writer.replication((short) 1))) { w.append(new DictWritable.K(id), new DictWritable(dicts)); } - cleanup(job, p); + cleanup(job, p, fs); return null; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index 597e946de98..af3bf468d5b 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -44,7 +44,7 @@ public WriterBinaryBlock(int replication) { _replication = replication; jobUse ++; - if(jobUse > 30){ + if(jobUse > 15){ job = new JobConf(ConfigurationManager.getCachedJobConf()); jobUse = 0; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java index c1cb0c627e7..6e399a62989 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java @@ -27,13 +27,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; -import org.apache.sysds.conf.DMLConfig; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.CommonThreadPool; -import org.apache.sysds.runtime.util.HDFSTool; public class WriterBinaryBlockParallel extends WriterBinaryBlock { @@ -49,6 +47,7 @@ protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock int numPartFiles = (int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, blen, src.getNonZeros()) / InfrastructureAnalyzer.getBlockSize(path.getFileSystem(job))); numPartFiles = Math.max(numPartFiles, 1); + numPartFiles = Math.min(numPartFiles,(int)( Math.ceil((double)rlen / blen) * Math.ceil((double)clen / blen) )); //determine degree of parallelism int numThreads = OptimizerUtils.getParallelBinaryWriteParallelism(); @@ -61,7 +60,7 @@ protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock } //create directory for concurrent tasks - HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + // HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); //create and execute write tasks final ExecutorService pool = CommonThreadPool.get(numThreads);