diff --git a/pom.xml b/pom.xml index dbe82b9ddb1..7c9cf2920ed 100644 --- a/pom.xml +++ b/pom.xml @@ -236,7 +236,7 @@ org.apache.sysds.performance.Main - SystemDS.jar SystemDS-tests.jar + SystemDS.jar ${project.build.directory}/${project.artifactId}-${project.version}-tests.jar diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java index 63e632c895c..4dabbc993de 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java @@ -35,6 +35,7 @@ import org.apache.sysds.hops.fedplanner.FTypes.FType; import org.apache.sysds.lops.Lop; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysds.runtime.controlprogram.federated.FederatedRange; @@ -592,6 +593,11 @@ protected MatrixBlock reconstructByLineage(LineageItem li) throws IOException { .acquireReadAndRelease(); } + @Override + public boolean isCompressed(){ + return _partitionInMemory instanceof CompressedMatrixBlock || super.isCompressed(); + } + @Override public String toString(){ StringBuilder sb = new StringBuilder(super.toString()); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java index c17fcb0ad40..93207ce06af 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java @@ -65,35 +65,33 @@ public static AggregateBinaryCPInstruction parseInstruction(String str) { if(numFields == 6) { boolean isLeftTransposed = Boolean.parseBoolean(parts[5]); boolean isRightTransposed = Boolean.parseBoolean(parts[6]); - return new AggregateBinaryCPInstruction(aggbin, - in1, in2, out, opcode, str, isLeftTransposed, isRightTransposed); + return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, opcode, str, isLeftTransposed, + isRightTransposed); } return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, opcode, str); } @Override public void processInstruction(ExecutionContext ec) { + MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName()); + MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName()); // check compressed inputs - final boolean comp1 = ec.getMatrixObject(input1.getName()).isCompressed(); - final boolean comp2 = ec.getMatrixObject(input2.getName()).isCompressed(); + final boolean comp1 = matBlock1 instanceof CompressedMatrixBlock; + final boolean comp2 = matBlock2 instanceof CompressedMatrixBlock; + if(comp1 || comp2) - processCompressedAggregateBinary(ec, comp1, comp2); + processCompressedAggregateBinary(ec, matBlock1, matBlock2, comp1, comp2); else if(transposeLeft || transposeRight) - processTransposedFusedAggregateBinary(ec); + processTransposedFusedAggregateBinary(ec, matBlock1, matBlock2); else - processNormal(ec); - } + processNormal(ec, matBlock1, matBlock2); - private void processNormal(ExecutionContext ec) { - // get inputs - MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName()); - MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName()); + } + private void processNormal(ExecutionContext ec, MatrixBlock matBlock1, MatrixBlock matBlock2) { // compute matrix multiplication AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr; - MatrixBlock ret; - - ret = matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op); + MatrixBlock ret = matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op); // release inputs/outputs ec.releaseMatrixInput(input1.getName()); @@ -101,9 +99,9 @@ private void processNormal(ExecutionContext ec) { ec.setMatrixOutput(output.getName(), ret); } - private void processTransposedFusedAggregateBinary(ExecutionContext ec) { - MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName()); - MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName()); + private void processTransposedFusedAggregateBinary(ExecutionContext ec, MatrixBlock matBlock1, + MatrixBlock matBlock2) { + // compute matrix multiplication AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr; MatrixBlock ret; @@ -127,9 +125,9 @@ private void processTransposedFusedAggregateBinary(ExecutionContext ec) { ec.setMatrixOutput(output.getName(), ret); } - private void processCompressedAggregateBinary(ExecutionContext ec, boolean c1, boolean c2) { - MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName()); - MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName()); + private void processCompressedAggregateBinary(ExecutionContext ec, MatrixBlock matBlock1, MatrixBlock matBlock2, + boolean c1, boolean c2) { + // compute matrix multiplication AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr; MatrixBlock ret; diff --git a/src/test/java/org/apache/sysds/performance/Main.java b/src/test/java/org/apache/sysds/performance/Main.java index 17a87bf0c06..1e51a703bf7 100644 --- a/src/test/java/org/apache/sysds/performance/Main.java +++ b/src/test/java/org/apache/sysds/performance/Main.java @@ -19,17 +19,114 @@ package org.apache.sysds.performance; -import org.apache.sysds.performance.compression.SteamCompressTest; +import org.apache.sysds.performance.compression.IOBandwidth; +import org.apache.sysds.performance.compression.SchemaTest; +import org.apache.sysds.performance.compression.Serialize; +import org.apache.sysds.performance.compression.StreamCompress; +import org.apache.sysds.performance.generators.ConstMatrix; +import org.apache.sysds.performance.generators.GenMatrices; +import org.apache.sysds.runtime.util.CommonThreadPool; public class Main { + private static void exec(int prog, String[] args) throws InterruptedException, Exception { + switch(prog) { + case 1: + new StreamCompress(100, new GenMatrices(10000, 100, 32, 1.0)).run(); + break; + case 2: + new SchemaTest(100, new GenMatrices(10000, 1000, 32, 1.0)).run(); + break; + case 3: + new SchemaTest(100, new GenMatrices(1000, 1, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 10, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 100, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 32, 1.0)).run(); + break; + case 4: + new SchemaTest(100, new GenMatrices(1000, 1000, 1, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 2, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 4, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 8, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 16, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 64, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 128, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 256, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 512, 1.0)).run(); + break; + case 5: + new SchemaTest(100, new ConstMatrix(1000, 100, 32, 1.0)).runCom(); + break; + case 6: + new SchemaTest(100, new GenMatrices(1000, 1000, 32, 0.3)).run(); + break; + case 7: + new SchemaTest(100, new ConstMatrix(1000, 1000, 32, 0.3)).runCom(); + break; + case 8: + new IOBandwidth(100, new ConstMatrix(1000, 1000, 32, 1.0)).run(); + break; + case 9: + run9(args); + break; + case 10: + run10(args); + break; + case 11: + run11(args, -1); + break; + case 12: + run11(args, Integer.parseInt(args[7])); + break; + default: + break; + } + } + + private static void run9(String[] args) throws InterruptedException, Exception { + int rows = Integer.parseInt(args[1]); + int cols = Integer.parseInt(args[2]); + int unique = Integer.parseInt(args[3]); + double sparsity = Double.parseDouble(args[4]); + int k = Integer.parseInt(args[5]); + int n = Integer.parseInt(args[6]); + new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).run(); + } + + private static void run10(String[] args) throws InterruptedException, Exception { + int rows = Integer.parseInt(args[1]); + int cols = Integer.parseInt(args[2]); + int unique = Integer.parseInt(args[3]); + double sparsity = Double.parseDouble(args[4]); + int k = Integer.parseInt(args[5]); + int n = Integer.parseInt(args[6]); + new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).runVector(); + } + + private static void run11(String[] args, int id) throws InterruptedException, Exception { + int rows = Integer.parseInt(args[1]); + int cols = Integer.parseInt(args[2]); + int unique = Integer.parseInt(args[3]); + double sparsity = Double.parseDouble(args[4]); + int k = Integer.parseInt(args[5]); + int n = Integer.parseInt(args[6]); + + Serialize s = new Serialize(n, new ConstMatrix(rows, cols, unique, sparsity), k); + + if(id == -1) + s.run(); + else + s.run(id); + } + public static void main(String[] args) { - try{ - SteamCompressTest.P1(); + try { + exec(Integer.parseInt(args[0]), args); + CommonThreadPool.get().shutdown(); } - catch(Exception e){ + catch(Exception e) { e.printStackTrace(); - } } } diff --git a/src/test/java/org/apache/sysds/performance/Util.java b/src/test/java/org/apache/sysds/performance/Util.java index 9bb2f8f9b8e..eb51b29af53 100644 --- a/src/test/java/org/apache/sysds/performance/Util.java +++ b/src/test/java/org/apache/sysds/performance/Util.java @@ -20,11 +20,13 @@ package org.apache.sysds.performance; import java.util.Arrays; -import java.util.concurrent.BlockingQueue; +import org.apache.sysds.performance.generators.IGenerate; import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; public interface Util { + + public static double time(F f) { Timing time = new Timing(true); f.run(); @@ -44,28 +46,43 @@ public static void time(F f, double[] times, int i) { times[i] = time.stop(); } - public static double[] time(F f, int rep, BlockingQueue bq) throws InterruptedException { + public static double[] time(F f, int rep, IGenerate bq) throws InterruptedException { double[] times = new double[rep]; for(int i = 0; i < rep; i++) { while(bq.isEmpty()) - Thread.sleep(100); + Thread.sleep(bq.defaultWaitTime()); Util.time(f, times, i); } return times; } public static String stats(double[] v) { - + final int l = v.length ; + final int remove = (int)Math.floor((double)l * 0.05); Arrays.sort(v); - final int l = v.length; - double min = v[0]; - double max = v[l - 1]; - double q25 = v[(int) (l / 4)]; - double q50 = v[(int) (l / 2)]; - double q75 = v[(int) ((l / 4) * 3)]; + double total = 0; + final int el = v.length - remove *2; + for(int i = remove; i < l-remove; i++) + total += v[i]; + + double mean = total / el; + + double var = 0; + for(int i = remove; i < l-remove; i++) + var += Math.pow(Math.abs(v[i] - mean), 2); + + double std = Math.sqrt(var / el); + + return String.format("%8.3f+-%7.3f ms", mean, std); + + // double min = v[0]; + // double max = v[l - 1]; + // double q25 = v[(int) (l / 4)]; + // double q50 = v[(int) (l / 2)]; + // double q75 = v[(int) ((l / 4) * 3)]; - return String.format("[%.3f, %.3f, %.3f, %.3f, %.3f]", min, q25, q50, q75, max); + // return String.format("[%8.3f, %8.3f, %8.3f, %8.3f, %8.3f]", min, q25, q50, q75, max); } interface F { diff --git a/src/test/java/org/apache/sysds/performance/compression/APerfTest.java b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java new file mode 100644 index 00000000000..d319ccbf6bb --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import java.util.ArrayList; + +import org.apache.sysds.performance.Util; +import org.apache.sysds.performance.Util.F; +import org.apache.sysds.performance.generators.IGenerate; + +public abstract class APerfTest { + + /** The Result array that all the results of the individual executions is producing */ + protected final ArrayList ret; + + /** A Task que that guarantee that the execution is not to long */ + protected final IGenerate gen; + + /** Default Repetitions */ + protected final int N; + + protected APerfTest(int N, IGenerate gen) { + ret = new ArrayList(N); + this.gen = gen; + this.N = N; + } + + protected void execute(F f, String name) throws InterruptedException { + warmup(f, 10); + gen.generate(N); + ret.clear(); + double[] times = Util.time(f, N, gen); + String retS = makeResString(times); + System.out.println(String.format("%35s, %s, %10s", name, Util.stats(times), retS)); + } + + protected void warmup(F f, int n) throws InterruptedException { + gen.generate(N); + ret.clear(); + } + + protected void execute(F f, String name, int N) throws InterruptedException { + gen.generate(N); + ret.clear(); + double[] times = Util.time(f, N, gen); + String retS = makeResString(times); + System.out.println(String.format("%35s, %s, %10s", name, Util.stats(times), retS)); + } + + protected abstract String makeResString(); + + protected String makeResString(double[] times){ + return makeResString(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("%20s ", this.getClass().getSimpleName())); + sb.append(" Repetitions: ").append(N).append(" "); + sb.append(gen); + return sb.toString(); + } +} diff --git a/src/test/java/org/apache/sysds/performance/compression/IOBandwidth.java b/src/test/java/org/apache/sysds/performance/compression/IOBandwidth.java new file mode 100644 index 00000000000..a063cf51afe --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/IOBandwidth.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; +import org.apache.sysds.runtime.compress.colgroup.scheme.CompressionScheme; +import org.apache.sysds.runtime.compress.lib.CLALibScheme; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.test.TestUtils; + +public class IOBandwidth extends APerfTest { + + final int k; + + public IOBandwidth(int N, IGenerate gen) { + super(N, gen); + k = 1; + } + + public IOBandwidth(int N, IGenerate gen, int k) { + super(N, gen); + this.k = k; + } + + public void run() throws Exception, InterruptedException { + System.out.println(this); + warmup(() -> sumTask(k), N); + execute(() -> sumTask(k), "Sum"); + execute(() -> maxTask(k), "Max"); + final MatrixBlock v = genVector(); + execute(() -> matrixVector(v, k), "MV mult"); + + final CompressionScheme sch2 = CLALibScheme.getScheme(getC()); + execute(() -> updateAndApplyScheme(sch2, k), "Update&Apply Scheme"); + execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused"); + execute(() -> applyScheme(sch2, k), "Apply Scheme"); + execute(() -> fromEmptySchemeDoNotKeep(k), "Update&Apply from Empty"); + execute(() -> compressTask(k), "Normal Compression"); + + } + + public void runVector() throws Exception, InterruptedException { + System.out.println(this); + final MatrixBlock v = genVector(); + execute(() -> matrixVector(v, k), "MV mult"); + execute(() -> sumTask(k), "Sum"); + execute(() -> maxTask(k), "Max"); + } + + private void matrixVector(MatrixBlock v, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = LibMatrixMult.matrixMult(mb, v, k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void sumTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = mb.sum(k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void maxTask(int k){ + + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = mb.max(k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void compressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void applyScheme(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.encode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void updateAndApplyScheme(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + sch.update(mb, k); + MatrixBlock cmb = sch.encode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void updateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void fromEmptySchemeDoNotKeep(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + CompressionScheme sch = CLALibScheme.genScheme(CompressionType.EMPTY, mb.getNumColumns()); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private CompressedMatrixBlock getC() throws InterruptedException { + gen.generate(1); + MatrixBlock mb = gen.take(); + return (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(mb).getLeft(); + } + + private MatrixBlock genVector() throws InterruptedException { + gen.generate(1); + MatrixBlock mb = gen.take(); + MatrixBlock vector = TestUtils.generateTestMatrixBlock(mb.getNumColumns(), 1, -1.0, 1.0, 1.0, 324); + return vector; + } + + @Override + protected String makeResString() { + throw new RuntimeException("Do not call"); + } + + @Override + protected String makeResString(double[] times) { + double totalIn = 0; + double totalOut = 0; + double totalTime = 0.0; + for(int i = 0; i < ret.size(); i++) // set times + ret.get(i).time = times[i] / 1000; // ms to sec + + ret.sort(IOBandwidth::compare); + + final int l = ret.size(); + final int remove = (int) Math.floor((double) l * 0.05); + + final int el = l - remove * 2; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + totalIn += e.in; + totalOut += e.out; + totalTime += e.time; + } + + double bytePerMsIn = totalIn / totalTime; + double bytePerMsOut = totalOut / totalTime; + // double meanTime = totalTime / el; + + double varIn = 0; + double varOut = 0; + // double varTime = 0; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + varIn += Math.pow(e.in / e.time - bytePerMsIn, 2); + varOut += Math.pow(e.out / e.time - bytePerMsOut, 2); + } + + double stdIn = Math.sqrt(varIn / el); + double stdOut = Math.sqrt(varOut / el); + + return String.format("%12.0f+-%12.0f Byte/s, %12.0f+-%12.0f Byte/s", bytePerMsIn, stdIn, bytePerMsOut, stdOut); + } + + // protected String changeValue(double bytePerMs) { + // // double bytePerSec = bytePerMs * 1000; + // if(bytePerSec > 1000000000) { + // return String.format("%6.2f GB/s", bytePerSec / 1024 / 1024 / 1024); + // } + // else if(bytePerSec > 1000000) { + // return String.format("%6.2f MB/s", bytePerSec / 1024 / 1024); + // } + // else if(bytePerSec > 1000) { + // return String.format("%6.2f KB/s", bytePerSec / 1024); + // } + // else { + // return String.format("%6.2f B/s", bytePerSec); + // } + // } + + public static int compare(InOut a, InOut b) { + if(a.time == b.time) + return 0; + else if(a.time < b.time) + return -1; + else + return 1; + } + + @Override + public String toString() { + return super.toString() + " threads: " + k; + } + + protected class InOut { + protected long in; + protected long out; + protected double time; + + protected InOut(long in, long out) { + this.in = in; + this.out = out; + } + + } + +} diff --git a/src/test/java/org/apache/sysds/performance/compression/SchemaTest.java b/src/test/java/org/apache/sysds/performance/compression/SchemaTest.java new file mode 100644 index 00000000000..9eb81e72c4a --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/SchemaTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.performance.compression; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; +import org.apache.sysds.runtime.compress.colgroup.scheme.CompressionScheme; +import org.apache.sysds.runtime.compress.lib.CLALibScheme; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class SchemaTest extends APerfTest { + + public SchemaTest(int N, IGenerate gen) { + super(N, gen); + } + + public void run() throws Exception, InterruptedException { + System.out.println(this); + execute(() -> sumTask(), "Sum Task -- Warmup"); + execute(() -> compressTask(), "Compress Normal 10 blocks"); + final CompressedMatrixBlock cmb = (CompressedMatrixBlock) ret.get(0); + final CompressionScheme sch = CLALibScheme.getScheme(cmb); + execute(() -> updateScheme(sch), "Update Scheme"); + execute(() -> applyScheme(sch), "Apply Scheme"); + final CompressionScheme sch2 = CLALibScheme.getScheme(cmb); + execute(() -> updateAndApplyScheme(sch2), "Update & Apply Scheme"); + execute(() -> fromEmptyScheme(), "From Empty Update & Apply Scheme"); + } + + public void runCom() throws Exception, InterruptedException { + // execute(() -> sumTask(), "Sum Task -- Warmup", 10); + execute(() -> compressTaskDoNotKeep(), "Compress Normal 10 blocks", 10); + for(int i = 0; i < 100; i++){ + execute(() -> fromEmptySchemeDoNotKeep(), "From Empty Update & Apply Scheme", 10000); + } + // System.out.println((CompressedMatrixBlock) ret.get(0)); + } + + protected String makeResString() { + return ""; + } + + private void sumTask() { + gen.take().sum(); + } + + private void compressTask() { + ret.add(CompressedMatrixBlockFactory.compress(gen.take()).getLeft()); + } + + private void compressTaskDoNotKeep() { + CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + } + + private void updateScheme(CompressionScheme sch) { + sch.update(gen.take()); + } + + private void applyScheme(CompressionScheme sch) { + ret.add(sch.encode(gen.take())); + } + + private void updateAndApplyScheme(CompressionScheme sch) { + MatrixBlock mb = gen.take(); + sch.update(mb); + ret.add(sch.encode(mb)); + } + + private void fromEmptyScheme() { + MatrixBlock mb = gen.take(); + CompressionScheme sch = CLALibScheme.genScheme(CompressionType.EMPTY, mb.getNumColumns()); + sch.update(mb); + ret.add(sch.encode(mb)); + } + + private void fromEmptySchemeDoNotKeep() { + MatrixBlock mb = gen.take(); + CompressionScheme sch = CLALibScheme.genScheme(CompressionType.EMPTY, mb.getNumColumns()); + + sch.update(mb); + sch.encode(mb); + } +} diff --git a/src/test/java/org/apache/sysds/performance/compression/Serialize.java b/src/test/java/org/apache/sysds/performance/compression/Serialize.java new file mode 100644 index 00000000000..17fc3feda0a --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/Serialize.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.CompressionScheme; +import org.apache.sysds.runtime.compress.io.WriterCompressed; +import org.apache.sysds.runtime.compress.lib.CLALibScheme; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.WriterBinaryBlockParallel; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class Serialize extends APerfTest { + + static final String file = "./perftmp.bin"; + final int k; + + public Serialize(int N, IGenerate gen) { + super(N, gen); + k = 1; + } + + public Serialize(int N, IGenerate gen, int k) { + super(N, gen); + this.k = k; + } + + public void run() throws Exception, InterruptedException { + System.out.println(this); + warmup(() -> sumTask(k), N); + cleanup(); + execute(() -> writeUncompressed(k), "Serialize"); + execute(() -> diskUncompressed(k), "CustomDisk"); + cleanup(); + execute(() -> standardIO(k), "StandardDisk"); + cleanup(); + + execute(() -> compressTask(k), "Compress Normal"); + execute(() -> writeCompressTask(k), "Compress Normal Serialize"); + execute(() -> diskCompressTask(k), "Compress Normal CustomDisk"); + cleanup(); + execute(() -> standardCompressedIO(k), "Compress StandardIO"); + cleanup(); + + final CompressionScheme sch2 = CLALibScheme.getScheme(getC()); + execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused"); + execute(() -> writeUpdateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused Serialize"); + cleanup(); + execute(() -> diskUpdateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused Disk"); + cleanup(); + execute(() -> standardCompressedIOUpdateAndApply(sch2, k), "Update&Apply Standard IO"); + } + + public void run(int i) throws Exception, InterruptedException { + warmup(() -> sumTask(k), N); + final CompressionScheme sch = CLALibScheme.getScheme(getC()); + cleanup(); + switch(i) { + case 1: + execute(() -> writeUncompressed(k), "Serialize"); + break; + case 2: + execute(() -> diskUncompressed(k), "CustomDisk"); + break; + case 3: + execute(() -> standardIO(k), "StandardDisk"); + break; + case 4: + execute(() -> compressTask(k), "Compress Normal"); + break; + case 5: + execute(() -> writeCompressTask(k), "Compress Normal Serialize"); + break; + case 6: + execute(() -> diskCompressTask(k), "Compress Normal CustomDisk"); + break; + case 7: + execute(() -> standardCompressedIO(k), "Compress StandardIO"); + break; + case 8: + execute(() -> updateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused"); + break; + case 9: + execute(() -> writeUpdateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused Serialize"); + break; + case 10: + execute(() -> diskUpdateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused Disk"); + break; + case 11: + execute(() -> standardCompressedIOUpdateAndApply(sch, k), "Update&Apply Standard IO"); + break; + } + cleanup(); + } + + private void writeUncompressed(int k) { + MatrixBlock mb = gen.take(); + Sink o = serialize(mb); + ret.add(new InOut(mb.getInMemorySize(), o.size())); + } + + private void diskUncompressed(int k) { + MatrixBlock mb = gen.take(); + Disk o = serializeD(mb); + ret.add(new InOut(mb.getInMemorySize(), o.size())); + } + + private void standardIO(int k) { + try { + MatrixWriter w = new WriterBinaryBlockParallel(1); + MatrixBlock mb = gen.take(); + w.writeMatrixToHDFS(mb, file, mb.getNumRows(), mb.getNumColumns(), 1000, mb.getNonZeros(), false); + ret.add(new InOut(mb.getInMemorySize(), Files.size(Paths.get(file)))); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + private void compressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void writeCompressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + Sink o = serialize(cmb); + ret.add(new InOut(in, o.size())); + } + + private void diskCompressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + Disk o = serializeD(cmb); + ret.add(new InOut(in, o.size())); + } + + private void standardCompressedIO(int k) { + try { + // MatrixWriter w = new WriterBinaryBlockParallel(1); + MatrixBlock mb = gen.take(); + WriterCompressed.writeCompressedMatrixToHDFS(mb, file); + ret.add(new InOut(mb.getInMemorySize(), Files.size(Paths.get(file)))); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + private void updateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void writeUpdateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + Sink o = serialize(cmb); + ret.add(new InOut(in, o.size())); + } + + private void diskUpdateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + Disk o = serializeD(cmb); + ret.add(new InOut(in, o.size())); + } + + private void standardCompressedIOUpdateAndApply(CompressionScheme sch, int k) { + try { + // MatrixWriter w = new WriterBinaryBlockParallel(1); + MatrixBlock mb = gen.take(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + WriterCompressed.writeCompressedMatrixToHDFS(cmb, file); + ret.add(new InOut(mb.getInMemorySize(), Files.size(Paths.get(file)))); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + private void sumTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = mb.sum(k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private CompressedMatrixBlock getC() throws InterruptedException { + gen.generate(1); + MatrixBlock mb = gen.take(); + return (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(mb).getLeft(); + } + + @Override + protected String makeResString() { + throw new RuntimeException("Do not call"); + } + + @Override + protected String makeResString(double[] times) { + double totalIn = 0; + double totalOut = 0; + double totalTime = 0.0; + for(int i = 0; i < ret.size(); i++) // set times + ret.get(i).time = times[i] / 1000; // ms to sec + + ret.sort(Serialize::compare); + + final int l = ret.size(); + final int remove = (int) Math.floor((double) l * 0.05); + + final int el = l - remove * 2; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + totalIn += e.in; + totalOut += e.out; + totalTime += e.time; + } + + double bytePerMsIn = totalIn / totalTime; + double bytePerMsOut = totalOut / totalTime; + // double meanTime = totalTime / el; + + double varIn = 0; + double varOut = 0; + // double varTime = 0; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + varIn += Math.pow(e.in / e.time - bytePerMsIn, 2); + varOut += Math.pow(e.out / e.time - bytePerMsOut, 2); + } + + double stdIn = Math.sqrt(varIn / el); + double stdOut = Math.sqrt(varOut / el); + + return String.format("%12.0f+-%12.0f Byte/s, %12.0f+-%12.0f Byte/s", bytePerMsIn, stdIn, bytePerMsOut, stdOut); + } + + public static int compare(InOut a, InOut b) { + if(a.time == b.time) + return 0; + else if(a.time < b.time) + return -1; + else + return 1; + } + + public static Sink serialize(MatrixBlock mb) { + try { + Sink s = new Sink(); + DataOutputStream fos = new DataOutputStream(s); + mb.write(fos); + return s; + } + catch(IOException e) { + throw new RuntimeException(e); + } + } + + public static Disk serializeD(MatrixBlock mb) { + try { + Disk s = new Disk(); + DataOutputStream fos = new DataOutputStream(s); + mb.write(fos); + return s; + } + catch(IOException e) { + throw new RuntimeException(e); + } + } + + private static class Sink extends OutputStream { + long s = 0L; + + @Override + public void write(int b) throws IOException { + s++; + } + + @Override + public void write(byte[] b) throws IOException { + s += b.length; + } + + public long size() { + return s; + } + + } + + private static class Disk extends OutputStream { + final FileOutputStream writer; + final BufferedOutputStream buf; + long s = 0L; + + protected Disk() throws FileNotFoundException { + writer = new FileOutputStream(file); + buf = new BufferedOutputStream(writer, 4096); + } + + @Override + public void write(int b) throws IOException { + s++; + buf.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + s += b.length; + buf.write(b); + } + + public long size() { + try { + buf.close(); + writer.close(); + return s; + } + catch(Exception e) { + return s; + } + } + } + + private void cleanup() { + File f = new File(file); + if(f.exists()) { + if(f.isDirectory()) + deleteDirectory(f); + else + f.delete(); + } + } + + boolean deleteDirectory(File directoryToBeDeleted) { + File[] allContents = directoryToBeDeleted.listFiles(); + if(allContents != null) { + for(File file : allContents) { + deleteDirectory(file); + } + } + return directoryToBeDeleted.delete(); + } + + @Override + public String toString() { + return super.toString() + " threads: " + k; + } + + protected class InOut { + protected long in; + protected long out; + protected double time; + + protected InOut(long in, long out) { + this.in = in; + this.out = out; + } + + } + +} diff --git a/src/test/java/org/apache/sysds/performance/compression/SteamCompressTest.java b/src/test/java/org/apache/sysds/performance/compression/SteamCompressTest.java deleted file mode 100644 index e414e516320..00000000000 --- a/src/test/java/org/apache/sysds/performance/compression/SteamCompressTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.performance.compression; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.util.ArrayList; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.zip.Deflater; -import java.util.zip.DeflaterOutputStream; - -import org.apache.sysds.performance.Util; -import org.apache.sysds.performance.Util.F; -import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.util.CommonThreadPool; -import org.apache.sysds.test.TestUtils; - -public class SteamCompressTest { - - private static BlockingQueue tasks = new ArrayBlockingQueue<>(8); - private static ArrayList ret; - - public static void P1() throws Exception, InterruptedException { - System.out.println("Running Steam Compression Test"); - CommonThreadPool.get(2); - - execute(() -> sumTask(), "Sum Task -- Warmup"); - execute(() -> blockSizeTask(), "In Memory Block Size"); - execute(() -> writeSteam(), "Write Blocks Stream"); - execute(() -> writeSteamDeflaterOutputStreamDef(), "Write Stream Deflate"); - execute(() -> writeSteamDeflaterOutputStreamSpeed(), "Write Stream Deflate Speedy"); - execute(() -> compressTask(), "In Memory Compress Individual (CI)"); - execute(() -> writeStreamCompressTask(), "Write CI Stream"); - execute(() -> writeStreamCompressDeflaterOutputStreamTask(), "Write CI Deflate Stream"); - execute(() -> writeStreamCompressDeflaterOutputStreamTaskSpeedy(), "Write CI Deflate Stream Speedy"); - - } - - private static void execute(F f, String name) throws InterruptedException { - final int N = 100; - fillTasks(N); - if(ret == null) - ret = new ArrayList(); - else - ret.clear(); - double[] times = Util.time(f, N, tasks); - Double avgRes = ret.stream().mapToDouble(a -> a).average().getAsDouble(); - System.out.println(String.format("%35s, %50s, %10.2f", name, Util.stats(times), avgRes)); - - } - - private static void sumTask() { - try { - ret.add(tasks.take().sum()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed sum"); - } - } - - private static void blockSizeTask() { - try { - ret.add((double)tasks.take().getInMemorySize()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed sum"); - } - } - - private static void compressTask() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ret.add((double) mb.getInMemorySize()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeStreamCompressTask() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream fos = new DataOutputStream(bos); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeStreamCompressDeflaterOutputStreamTask() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeStreamCompressDeflaterOutputStreamTaskSpeedy() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeSteam() { - try { - MatrixBlock mb = tasks.take(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream fos = new DataOutputStream(bos); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("failed Write Stream"); - } - } - - private static void writeSteamDeflaterOutputStreamDef() { - try { - MatrixBlock mb = tasks.take(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeSteamDeflaterOutputStreamSpeed() { - try { - MatrixBlock mb = tasks.take(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void fillTasks(int nBlocks) { - CompletableFuture.runAsync(() -> { - - for(int i = 0; i < nBlocks; i++) { - MatrixBlock mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1000, 100, 0, 32, 0.2, i)); - try { - tasks.put(mb); - } - catch(InterruptedException e) { - e.printStackTrace(); - } - } - }); - } -} diff --git a/src/test/java/org/apache/sysds/performance/compression/StreamCompress.java b/src/test/java/org/apache/sysds/performance/compression/StreamCompress.java new file mode 100644 index 00000000000..97418a570ca --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/StreamCompress.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class StreamCompress extends APerfTest { + + public StreamCompress(int N, IGenerate gen) { + super(N, gen); + } + + public void run() throws Exception, InterruptedException, IOException { + System.out.println("Running Steam Compression Test"); + System.out.println(this); + + warmup(() -> sumTask(), 10); + execute(() -> blockSizeTask(), "In Memory Block Size"); + execute(() -> writeSteam(), "Write Blocks Stream"); + execute(() -> writeSteamDeflaterOutputStreamDef(), "Write Stream Deflate"); + execute(() -> writeSteamDeflaterOutputStreamSpeed(), "Write Stream Deflate Speedy"); + execute(() -> compressTask(), "In Memory Compress Individual (CI)"); + execute(() -> writeStreamCompressTask(), "Write CI Stream"); + execute(() -> writeStreamCompressDeflaterOutputStreamTask(), "Write CI Deflate Stream"); + execute(() -> writeStreamCompressDeflaterOutputStreamTaskSpeedy(), "Write CI Deflate Stream Speedy"); + + } + + @Override + protected String makeResString() { + Double avgRes = ret.stream().mapToDouble(a -> a).average().getAsDouble(); + return String.format("%10.2f", avgRes); + } + + private void sumTask() { + ret.add(gen.take().sum()); + } + + private void blockSizeTask() { + ret.add((double) gen.take().getInMemorySize()); + } + + private void compressTask() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ret.add((double) mb.getInMemorySize()); + + } + + private void writeStreamCompressTask() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream fos = new DataOutputStream(bos); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeStreamCompressDeflaterOutputStreamTask() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeStreamCompressDeflaterOutputStreamTaskSpeedy() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeSteam() { + + MatrixBlock mb = gen.take(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream fos = new DataOutputStream(bos); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeSteamDeflaterOutputStreamDef() { + + MatrixBlock mb = gen.take(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeSteamDeflaterOutputStreamSpeed() { + + MatrixBlock mb = gen.take(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + +} diff --git a/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java new file mode 100644 index 00000000000..f01d0a2075b --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.generators; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.functionobjects.ReduceAll; +import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType; +import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator; +import org.apache.sysds.test.TestUtils; + +public class ConstMatrix implements IGenerate { + + protected final MatrixBlock mb; + protected final int nVal; + + public ConstMatrix(MatrixBlock mb) { + this.mb = mb; + this.nVal = (int) LibMatrixCountDistinct + .estimateDistinctValues(mb, + new CountDistinctOperator(AUType.COUNT_DISTINCT, Types.Direction.RowCol, ReduceAll.getReduceAllFnObject())) + .getValue(0, 0); + } + + public ConstMatrix(int r, int c, int nVal, double s) { + this.mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(r, c, 0, nVal, s, 42)); + this.nVal = nVal; + } + + @Override + public MatrixBlock take() { + return mb; + } + + @Override + public void generate(int N) throws InterruptedException { + // do nothing + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClass().getSimpleName()); + sb.append(" ( Rows:").append(mb.getNumRows()); + sb.append(", Cols:").append(mb.getNumColumns()); + sb.append(", Spar:").append(mb.getSparsity()); + sb.append(", Unique: ").append(nVal); + sb.append(")"); + return sb.toString(); + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public int defaultWaitTime() { + return 0; + } + +} diff --git a/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java b/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java new file mode 100644 index 00000000000..f96233ae6fe --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.generators; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.test.TestUtils; + +public class GenMatrices implements IGenerate { + + /** A Task que that guarantee that the execution is not to long */ + protected final BlockingQueue tasks; + /** The number of rows in each task block */ + protected final int r; + /** The number of cols in each task block */ + protected final int c; + /** The number of max unique values */ + protected final int nVal; + /** The sparsity of the generated matrices */ + protected final double s; + /** The initial seed */ + protected final int seed; + + public GenMatrices(int r, int c, int nVal, double s) { + // Make a thread pool if not already there + CommonThreadPool.get(); + tasks = new ArrayBlockingQueue<>(8); + this.r = r; + this.c = c; + this.nVal = nVal; + this.s = s; + this.seed = 42; + } + + @Override + public MatrixBlock take() { + try { + return tasks.take(); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void generate(int N) throws InterruptedException { + CompletableFuture.runAsync(() -> { + try { + for(int i = 0; i < N; i++) { + tasks.put(TestUtils.ceil(TestUtils.generateTestMatrixBlock(r, c, 0, nVal, s, i + seed))); + } + } + catch(InterruptedException e) { + e.printStackTrace(); + } + }); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClass().getSimpleName()); + sb.append(" rand(").append(r).append(", ").append(c).append(", ").append(nVal).append(", ").append(s).append(")"); + sb.append(" Seed: ").append(seed); + return sb.toString(); + } + + @Override + public boolean isEmpty() { + return tasks.isEmpty(); + } + + @Override + public int defaultWaitTime() { + return 100; + } + +} diff --git a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java new file mode 100644 index 00000000000..ee39590bf31 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.generators; + +/** + * Generator interface for task generation. + */ +public interface IGenerate { + + /** + * Validate if the generator is empty, and we have to wait for elements. + * + * @return If the generator is empty + */ + public boolean isEmpty(); + + /** + * Default wait time for the generator to fill + * + * @return The wait time + */ + public int defaultWaitTime(); + + /** + * A Blocking take operation that waits for the Generator to fill that element + * + * @return An task element + */ + public T take(); + + /** + * A Non blocking async operation that generates elements for the task que + * + * @param N The number of elements to create + * @throws InterruptedException An exception if the task is interrupted + */ + public void generate(int N) throws InterruptedException; + +}