Skip to content

Commit

Permalink
reader script
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 23, 2023
1 parent e2b1183 commit d95d031
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,16 @@ public ICLAScheme update(MatrixBlock data, IColIndex columns) {
map.increment(emptyRow, row - r);
r = row;
}
final int id = map.getId(cellVals);
if(id >= 0)
map.increment(cellVals);
try{
if(!cellVals.equals(def)){
final int id = map.getId(cellVals);
if(id >= 0)
map.increment(cellVals);
}
}
catch(Exception e){
throw new RuntimeException(cellVals + " " + map, e);
}

}
if(!defIsEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public boolean isInPlace() {
* @param file file name
*/
public MatrixObject(ValueType vt, String file) {
this(vt, file, null); // HDFS file path
this(vt, file, null, null); // HDFS file path
}

/**
Expand All @@ -107,11 +107,7 @@ public MatrixObject(ValueType vt, String file) {
* @param mtd metadata
*/
public MatrixObject(ValueType vt, String file, MetaData mtd) {
super(DataType.MATRIX, vt);
_metaData = mtd;
_hdfsFileName = file;
_cache = null;
_data = null;
this(vt, file, mtd, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,24 +668,18 @@ else if( cb instanceof FrameBlock )
}

public static MatrixObject createMatrixObject(MatrixBlock mb) {
MatrixObject ret = new MatrixObject(Types.ValueType.FP64,
OptimizerUtils.getUniqueTempFileName());
ret.acquireModify(mb);
ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics(
mb.getNumRows(), mb.getNumColumns()), FileFormat.BINARY));
ret.getMetaData().getDataCharacteristics()
.setBlocksize(ConfigurationManager.getBlocksize());
ret.release();
MetaData md = new MetaDataFormat(
new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), ConfigurationManager.getBlocksize()),
FileFormat.BINARY);
MatrixObject ret = new MatrixObject(Types.ValueType.FP64, OptimizerUtils.getUniqueTempFileName(), md);
return ret;
}

public static MatrixObject createMatrixObject(DataCharacteristics dc) {
MatrixObject ret = new MatrixObject(Types.ValueType.FP64,
OptimizerUtils.getUniqueTempFileName());
ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics(
dc.getRows(), dc.getCols()), FileFormat.BINARY));
ret.getMetaData().getDataCharacteristics()
.setBlocksize(ConfigurationManager.getBlocksize());
MetaData md = new MetaDataFormat(
new MatrixCharacteristics(dc.getRows(), dc.getCols(), ConfigurationManager.getBlocksize()),
FileFormat.BINARY);
MatrixObject ret = new MatrixObject(Types.ValueType.FP64, OptimizerUtils.getUniqueTempFileName(), md);
return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.hops.OptimizerUtils;
Expand All @@ -44,11 +45,8 @@ protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock
throws IOException, DMLRuntimeException
{
//estimate output size and number of output blocks (min 1)
int numPartFiles = (int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen,
blen, src.getNonZeros()) / InfrastructureAnalyzer.getBlockSize(path.getFileSystem(job)));
numPartFiles = Math.max(numPartFiles, 1);
numPartFiles = Math.min(numPartFiles,(int)( Math.ceil((double)rlen / blen) * Math.ceil((double)clen / blen) ));

int numPartFiles = numPartsFiles(path.getFileSystem(job), rlen, clen, blen , src.getNonZeros());

//determine degree of parallelism
int numThreads = OptimizerUtils.getParallelBinaryWriteParallelism();
numThreads = Math.min(numThreads, numPartFiles);
Expand Down Expand Up @@ -85,6 +83,15 @@ protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock
}
}

public static int numPartsFiles(FileSystem fs, long rlen, long clen, long blen, long nZeros) {
int numPartFiles = (int) (OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, blen, nZeros) /
InfrastructureAnalyzer.getBlockSize(fs));
numPartFiles = Math.max(numPartFiles, 1);
numPartFiles = Math.min(numPartFiles,
(int) (Math.ceil((double) rlen / blen) * Math.ceil((double) clen / blen)));
return numPartFiles;
}

private class WriteFileTask implements Callable<Object>
{
private Path _path = null;
Expand Down
52 changes: 26 additions & 26 deletions src/main/java/org/apache/sysds/runtime/util/DataConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ public static void writeTensorToHDFS(TensorBlock tensor, String dir, FileFormat
writer.writeTensorToHDFS(tensor, dir, blen);
}

public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen, int blen, boolean localFS)
throws IOException
{
ReadProperties prop = new ReadProperties();
// public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen, int blen, boolean localFS)
// throws IOException
// {
// ReadProperties prop = new ReadProperties();

prop.path = dir;
prop.fmt = fmt;
prop.rlen = rlen;
prop.clen = clen;
prop.blen = blen;
prop.localFS = localFS;
// prop.path = dir;
// prop.fmt = fmt;
// prop.rlen = rlen;
// prop.clen = clen;
// prop.blen = blen;
// prop.localFS = localFS;

return readMatrixFromHDFS(prop);
}
// return readMatrixFromHDFS(prop);
// }

public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen, int blen)
throws IOException
Expand Down Expand Up @@ -156,22 +156,22 @@ public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rl
return readMatrixFromHDFS(prop);
}

public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen,
int blen, long expectedNnz, boolean localFS)
throws IOException
{
ReadProperties prop = new ReadProperties();
// public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen,
// int blen, long expectedNnz, boolean localFS)
// throws IOException
// {
// ReadProperties prop = new ReadProperties();

prop.path = dir;
prop.fmt = fmt;
prop.rlen = rlen;
prop.clen = clen;
prop.blen = blen;
prop.expectedNnz = expectedNnz;
prop.localFS = localFS;
// prop.path = dir;
// prop.fmt = fmt;
// prop.rlen = rlen;
// prop.clen = clen;
// prop.blen = blen;
// prop.expectedNnz = expectedNnz;
// prop.localFS = localFS;

return readMatrixFromHDFS(prop);
}
// return readMatrixFromHDFS(prop);
// }

public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen,
int blen, long expectedNnz, FileFormatProperties formatProperties)
Expand Down
35 changes: 30 additions & 5 deletions src/test/java/org/apache/sysds/performance/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.apache.sysds.performance.compression.StreamCompress;
import org.apache.sysds.performance.generators.ConstMatrix;
import org.apache.sysds.performance.generators.GenMatrices;
import org.apache.sysds.performance.generators.IGenerate;
import org.apache.sysds.performance.generators.MatrixFile;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;

public class Main {

private static void exec(int prog, String[] args) throws InterruptedException, Exception {
private static void exec(int prog, String[] args) throws Exception {
switch(prog) {
case 1:
new StreamCompress(100, new GenMatrices(10000, 100, 32, 1.0)).run();
Expand Down Expand Up @@ -79,12 +82,15 @@ private static void exec(int prog, String[] args) throws InterruptedException, E
case 12:
run11(args, Integer.parseInt(args[7]));
break;
case 13:
run13(args);
break;
default:
break;
}
}

private static void run9(String[] args) throws InterruptedException, Exception {
private static void run9(String[] args) throws Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
Expand All @@ -94,7 +100,7 @@ private static void run9(String[] args) throws InterruptedException, Exception {
new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).run();
}

private static void run10(String[] args) throws InterruptedException, Exception {
private static void run10(String[] args) throws Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
Expand All @@ -104,7 +110,7 @@ private static void run10(String[] args) throws InterruptedException, Exception
new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).runVector();
}

private static void run11(String[] args, int id) throws InterruptedException, Exception {
private static void run11(String[] args, int id) throws Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
Expand All @@ -120,13 +126,32 @@ private static void run11(String[] args, int id) throws InterruptedException, Ex
s.run(id);
}

private static void run13(String[] args) throws Exception {
int k = Integer.parseInt(args[1]);
int n = Integer.parseInt(args[2]);
String p = args[3];
int id = Integer.parseInt(args[4]);
run13A(n, MatrixFile.create(p), k, id);
}

private static void run13A(int n, IGenerate<MatrixBlock> g, int k, int id) throws Exception {

Serialize s = new Serialize(n, g, k);

if(id == -1)
s.run();
else
s.run(id);
}

public static void main(String[] args) {
try {
exec(Integer.parseInt(args[0]), args);
}
catch(Exception e) {
e.printStackTrace();
}finally{
}
finally {
CommonThreadPool.get().shutdown();
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/org/apache/sysds/performance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ With profiler:
```bash
java -jar -agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html target/systemds-3.2.0-SNAPSHOT-perf.jar 12 10000 100 4 1.0 16 1000 -1
```


```bash
java -jar -agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html target/systemds-3.2.0-SNAPSHOT-perf.jar 13 16 100 "temp/test.csv" -1
```
Original file line number Diff line number Diff line change
Expand Up @@ -75,31 +75,31 @@ public void run() throws Exception, InterruptedException {
if(!directory.exists()) {
directory.mkdir();
}

if(k == 1) {
ConfigurationManager.getCompilerConfig().set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS, false);
}

warmup(() -> sumTask(k), N);
execute(() -> writeUncompressed(k), "Serialize");

// execute(() -> writeUncompressed(k), "Serialize");
// execute(() -> diskUncompressed(k), "CustomDisk");

execute(() -> standardIO(k), () -> setFileSize(),() -> cleanup(), "StandardDisk");


execute(() -> compressTask(k), "Compress Normal");
execute(() -> standardIO(k), () -> setFileSize(), () -> cleanup(), "StandardDisk");

// execute(() -> compressTask(k), "Compress Normal");
// execute(() -> writeCompressTask(k), "Compress Normal Serialize");
// execute(() -> diskCompressTask(k), "Compress Normal CustomDisk");

execute(() -> standardCompressedIO(k), () -> setFileSize(),() -> cleanup(), "Compress StandardIO");


execute(() -> standardCompressedIO(k), () -> setFileSize(), () -> cleanup(), "Compress StandardIO");

final CompressionScheme sch2 = CLALibScheme.getScheme(getC());
execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused");
// execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused");
// execute(() -> writeUpdateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused Serialize");
// execute(() -> diskUpdateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused Disk");

execute(() -> standardCompressedIOUpdateAndApply(sch2, k), () -> setFileSize(), () -> cleanup(), "Update&Apply Standard IO");

execute(() -> standardCompressedIOUpdateAndApply(sch2, k), () -> setFileSize(), () -> cleanup(),
"Update&Apply Standard IO");
}

public void run(int i) throws Exception, InterruptedException {
Expand All @@ -109,7 +109,7 @@ public void run(int i) throws Exception, InterruptedException {
ConfigurationManager.getCompilerConfig().set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS, false);
}

final CompressionScheme sch = CLALibScheme.getScheme(getC());
final CompressionScheme sch = (i == 8 || i == 9 || i == 10 || i == 11) ? CLALibScheme.getScheme(getC()) : null;
cleanup();
switch(i) {
case 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public String toString() {
}

@Override
public boolean isEmpty() {
public final boolean isEmpty() {
return false;
}

@Override
public int defaultWaitTime() {
public final int defaultWaitTime() {
return 0;
}

Expand Down
Loading

0 comments on commit d95d031

Please sign in to comment.