Skip to content

Commit

Permalink
Some edge cases of single thread
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 22, 2023
1 parent 05d2f4c commit a222446
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static CompressionScheme getScheme(CompressedMatrixBlock cmb) {
}

public CompressedMatrixBlock updateAndEncode(MatrixBlock mb, int k) {
if(k == 1)
if(k == 1 || mb.getInMemorySize() < 1000 * 20 * 8)
return updateAndEncode(mb);

final ExecutorService pool = CommonThreadPool.get(k);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public final class WriterCompressed extends MatrixWriter {
protected static JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());

private String fname;

private FileSystem fs;
private Future<Writer>[] writers;
private Lock[] writerLocks;

Expand Down Expand Up @@ -137,10 +139,14 @@ private void write(MatrixBlock src, final String fname, final int blen) throws I
jobUse = 0;
}


if(this.fname != fname) {
this.fname = fname;
this.writers = null;
}

fs = IOUtilFunctions.getFileSystem(new Path(fname), job);

final int k = OptimizerUtils.getParallelBinaryWriteParallelism();
final int rlen = src.getNumRows();
final int clen = src.getNumColumns();
Expand All @@ -158,18 +164,20 @@ else if(!(src instanceof CompressedMatrixBlock))
}

private void writeSingleBlock(MatrixBlock b, int k) throws IOException {
Writer w = getWriter(fname);
final Path path = new Path(fname);
Writer w = generateWriter(job, path, fs);
MatrixIndexes idx = new MatrixIndexes(1, 1);
if(!(b instanceof CompressedMatrixBlock))
b = CompressedMatrixBlockFactory.compress(b, k).getLeft();
w.append(idx, new CompressedWriteBlock(b));
IOUtilFunctions.closeSilently(w);
cleanup();
cleanup(path);
}

private void writeMultiBlockUncompressed(MatrixBlock b, final int rlen, final int clen, final int blen, int k)
throws IOException {
Writer w = getWriter(fname);
final Path path = new Path(fname);
Writer w = generateWriter(job, path, fs);
final MatrixIndexes indexes = new MatrixIndexes();
LOG.warn("Writing compressed format with non identical compression scheme");

Expand All @@ -187,7 +195,7 @@ private void writeMultiBlockUncompressed(MatrixBlock b, final int rlen, final in
}
}
IOUtilFunctions.closeSilently(w);
cleanup();
cleanup(path);
}

private void writeMultiBlockCompressed(MatrixBlock b, final int rlen, final int clen, final int blen, int k)
Expand All @@ -205,7 +213,8 @@ private void writeMultiBlockCompressedSingleThread(MatrixBlock mb, final int rle
try {

setupWrite();
Writer w = getWriter(fname);
final Path path = new Path(fname);
Writer w = generateWriter(job, path, fs);
for(int bc = 0; bc * blen < clen; bc++) {// column blocks
final int sC = bc * blen;
final int mC = Math.min(sC + blen, clen) - 1;
Expand All @@ -221,6 +230,7 @@ private void writeMultiBlockCompressedSingleThread(MatrixBlock mb, final int rle

}
IOUtilFunctions.closeSilently(w);
cleanup(path);
}
catch(Exception e) {
throw new IOException(e);
Expand All @@ -244,7 +254,7 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi
final int j = i;
if(writers[i] == null) {
writers[i] = pool.submit(() -> {
return generateWriter(job, getPath(j));
return generateWriter(job, getPath(j), fs);
});
}
writerLocks[i] = new ReentrantLock();
Expand Down Expand Up @@ -282,7 +292,7 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi
pool.submit(() -> {
try {
IOUtilFunctions.closeSilently(writers[l].get());
cleanup(job, getPath(l));
cleanup(job, getPath(l), fs);
}
catch(Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -310,26 +320,24 @@ private Path getPath(int id) {
return new Path(fname, IOUtilFunctions.getPartFileName(id));
}

private Writer getWriter(String fname) throws IOException {
final Path path = new Path(fname);
return generateWriter(job, path);
}
// private Writer getWriter(String fname) throws IOException {
// final Path path = new Path(fname);
// return generateWriter(job, path);
// }

private static Writer generateWriter(JobConf job, Path path) throws IOException {
private static Writer generateWriter(JobConf job, Path path, FileSystem fs) throws IOException {

return SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096),
Writer.keyClass(MatrixIndexes.class), Writer.valueClass(CompressedWriteBlock.class),
Writer.compression(SequenceFile.CompressionType.NONE), // No Compression type on disk
Writer.replication((short) 1));
}

private void cleanup() throws IOException {
final Path path = new Path(fname);
cleanup(job, path);
private void cleanup(Path p) throws IOException {
cleanup(job, p, fs);
}

private static void cleanup(JobConf job, Path path) throws IOException {
final FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
private static void cleanup(JobConf job, Path path, FileSystem fs) throws IOException {
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
}

Expand Down Expand Up @@ -400,7 +408,7 @@ public Object call() throws Exception {
Writer.replication((short) 1))) {
w.append(new DictWritable.K(id), new DictWritable(dicts));
}
cleanup(job, p);
cleanup(job, p, fs);
return null;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public WriterBinaryBlock(int replication) {
_replication = replication;

jobUse ++;
if(jobUse > 30){
if(jobUse > 15){
job = new JobConf(ConfigurationManager.getCachedJobConf());
jobUse = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.HDFSTool;

public class WriterBinaryBlockParallel extends WriterBinaryBlock
{
Expand All @@ -49,6 +47,7 @@ protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock
int numPartFiles = (int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen,
blen, src.getNonZeros()) / InfrastructureAnalyzer.getBlockSize(path.getFileSystem(job)));
numPartFiles = Math.max(numPartFiles, 1);
numPartFiles = Math.min(numPartFiles,(int)( Math.ceil((double)rlen / blen) * Math.ceil((double)clen / blen) ));

//determine degree of parallelism
int numThreads = OptimizerUtils.getParallelBinaryWriteParallelism();
Expand All @@ -61,7 +60,7 @@ protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock
}

//create directory for concurrent tasks
HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
// HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);

//create and execute write tasks
final ExecutorService pool = CommonThreadPool.get(numThreads);
Expand Down

0 comments on commit a222446

Please sign in to comment.