Skip to content

Commit

Permalink
[SYSTEMDS-???] SchemaApplyPerfTests
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 8, 2023
1 parent 8dbfc23 commit abebe83
Show file tree
Hide file tree
Showing 14 changed files with 1,388 additions and 244 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@
<mainClass>org.apache.sysds.performance.Main</mainClass>
</manifest>
<manifestEntries>
<Class-Path>SystemDS.jar SystemDS-tests.jar</Class-Path>
<Class-Path>SystemDS.jar ${project.build.directory}/${project.artifactId}-${project.version}-tests.jar</Class-Path>
</manifestEntries>
</archive>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.sysds.hops.fedplanner.FTypes.FType;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
Expand Down Expand Up @@ -592,6 +593,11 @@ protected MatrixBlock reconstructByLineage(LineageItem li) throws IOException {
.acquireReadAndRelease();
}

@Override
public boolean isCompressed(){
return _partitionInMemory instanceof CompressedMatrixBlock || super.isCompressed();
}

@Override
public String toString(){
StringBuilder sb = new StringBuilder(super.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,45 +65,43 @@ public static AggregateBinaryCPInstruction parseInstruction(String str) {
if(numFields == 6) {
boolean isLeftTransposed = Boolean.parseBoolean(parts[5]);
boolean isRightTransposed = Boolean.parseBoolean(parts[6]);
return new AggregateBinaryCPInstruction(aggbin,
in1, in2, out, opcode, str, isLeftTransposed, isRightTransposed);
return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, opcode, str, isLeftTransposed,
isRightTransposed);
}
return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, opcode, str);
}

@Override
public void processInstruction(ExecutionContext ec) {
MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
// check compressed inputs
final boolean comp1 = ec.getMatrixObject(input1.getName()).isCompressed();
final boolean comp2 = ec.getMatrixObject(input2.getName()).isCompressed();
final boolean comp1 = matBlock1 instanceof CompressedMatrixBlock;
final boolean comp2 = matBlock2 instanceof CompressedMatrixBlock;

if(comp1 || comp2)
processCompressedAggregateBinary(ec, comp1, comp2);
processCompressedAggregateBinary(ec, matBlock1, matBlock2, comp1, comp2);
else if(transposeLeft || transposeRight)
processTransposedFusedAggregateBinary(ec);
processTransposedFusedAggregateBinary(ec, matBlock1, matBlock2);
else
processNormal(ec);
}
processNormal(ec, matBlock1, matBlock2);

private void processNormal(ExecutionContext ec) {
// get inputs
MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
}

private void processNormal(ExecutionContext ec, MatrixBlock matBlock1, MatrixBlock matBlock2) {
// compute matrix multiplication
AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
MatrixBlock ret;

ret = matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op);
MatrixBlock ret = matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op);

// release inputs/outputs
ec.releaseMatrixInput(input1.getName());
ec.releaseMatrixInput(input2.getName());
ec.setMatrixOutput(output.getName(), ret);
}

private void processTransposedFusedAggregateBinary(ExecutionContext ec) {
MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
private void processTransposedFusedAggregateBinary(ExecutionContext ec, MatrixBlock matBlock1,
MatrixBlock matBlock2) {

// compute matrix multiplication
AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
MatrixBlock ret;
Expand All @@ -127,9 +125,9 @@ private void processTransposedFusedAggregateBinary(ExecutionContext ec) {
ec.setMatrixOutput(output.getName(), ret);
}

private void processCompressedAggregateBinary(ExecutionContext ec, boolean c1, boolean c2) {
MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
private void processCompressedAggregateBinary(ExecutionContext ec, MatrixBlock matBlock1, MatrixBlock matBlock2,
boolean c1, boolean c2) {

// compute matrix multiplication
AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
MatrixBlock ret;
Expand Down
107 changes: 102 additions & 5 deletions src/test/java/org/apache/sysds/performance/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,114 @@

package org.apache.sysds.performance;

import org.apache.sysds.performance.compression.SteamCompressTest;
import org.apache.sysds.performance.compression.IOBandwidth;
import org.apache.sysds.performance.compression.SchemaTest;
import org.apache.sysds.performance.compression.Serialize;
import org.apache.sysds.performance.compression.StreamCompress;
import org.apache.sysds.performance.generators.ConstMatrix;
import org.apache.sysds.performance.generators.GenMatrices;
import org.apache.sysds.runtime.util.CommonThreadPool;

public class Main {

private static void exec(int prog, String[] args) throws InterruptedException, Exception {
switch(prog) {
case 1:
new StreamCompress(100, new GenMatrices(10000, 100, 32, 1.0)).run();
break;
case 2:
new SchemaTest(100, new GenMatrices(10000, 1000, 32, 1.0)).run();
break;
case 3:
new SchemaTest(100, new GenMatrices(1000, 1, 32, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 10, 32, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 100, 32, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 32, 1.0)).run();
break;
case 4:
new SchemaTest(100, new GenMatrices(1000, 1000, 1, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 2, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 4, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 8, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 16, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 32, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 64, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 128, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 256, 1.0)).run();
new SchemaTest(100, new GenMatrices(1000, 1000, 512, 1.0)).run();
break;
case 5:
new SchemaTest(100, new ConstMatrix(1000, 100, 32, 1.0)).runCom();
break;
case 6:
new SchemaTest(100, new GenMatrices(1000, 1000, 32, 0.3)).run();
break;
case 7:
new SchemaTest(100, new ConstMatrix(1000, 1000, 32, 0.3)).runCom();
break;
case 8:
new IOBandwidth(100, new ConstMatrix(1000, 1000, 32, 1.0)).run();
break;
case 9:
run9(args);
break;
case 10:
run10(args);
break;
case 11:
run11(args, -1);
break;
case 12:
run11(args, Integer.parseInt(args[7]));
break;
default:
break;
}
}

private static void run9(String[] args) throws InterruptedException, Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
double sparsity = Double.parseDouble(args[4]);
int k = Integer.parseInt(args[5]);
int n = Integer.parseInt(args[6]);
new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).run();
}

private static void run10(String[] args) throws InterruptedException, Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
double sparsity = Double.parseDouble(args[4]);
int k = Integer.parseInt(args[5]);
int n = Integer.parseInt(args[6]);
new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).runVector();
}

private static void run11(String[] args, int id) throws InterruptedException, Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
double sparsity = Double.parseDouble(args[4]);
int k = Integer.parseInt(args[5]);
int n = Integer.parseInt(args[6]);

Serialize s = new Serialize(n, new ConstMatrix(rows, cols, unique, sparsity), k);

if(id == -1)
s.run();
else
s.run(id);
}

public static void main(String[] args) {
try{
SteamCompressTest.P1();
try {
exec(Integer.parseInt(args[0]), args);
CommonThreadPool.get().shutdown();
}
catch(Exception e){
catch(Exception e) {
e.printStackTrace();

}
}
}
39 changes: 28 additions & 11 deletions src/test/java/org/apache/sysds/performance/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.apache.sysds.performance;

import java.util.Arrays;
import java.util.concurrent.BlockingQueue;

import org.apache.sysds.performance.generators.IGenerate;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;

public interface Util {


public static double time(F f) {
Timing time = new Timing(true);
f.run();
Expand All @@ -44,28 +46,43 @@ public static void time(F f, double[] times, int i) {
times[i] = time.stop();
}

public static double[] time(F f, int rep, BlockingQueue<?> bq) throws InterruptedException {
public static double[] time(F f, int rep, IGenerate<?> bq) throws InterruptedException {
double[] times = new double[rep];
for(int i = 0; i < rep; i++) {
while(bq.isEmpty())
Thread.sleep(100);
Thread.sleep(bq.defaultWaitTime());
Util.time(f, times, i);
}
return times;
}

public static String stats(double[] v) {

final int l = v.length ;
final int remove = (int)Math.floor((double)l * 0.05);
Arrays.sort(v);
final int l = v.length;

double min = v[0];
double max = v[l - 1];
double q25 = v[(int) (l / 4)];
double q50 = v[(int) (l / 2)];
double q75 = v[(int) ((l / 4) * 3)];
double total = 0;
final int el = v.length - remove *2;
for(int i = remove; i < l-remove; i++)
total += v[i];

double mean = total / el;

double var = 0;
for(int i = remove; i < l-remove; i++)
var += Math.pow(Math.abs(v[i] - mean), 2);

double std = Math.sqrt(var / el);

return String.format("%8.3f+-%7.3f ms", mean, std);

// double min = v[0];
// double max = v[l - 1];
// double q25 = v[(int) (l / 4)];
// double q50 = v[(int) (l / 2)];
// double q75 = v[(int) ((l / 4) * 3)];

return String.format("[%.3f, %.3f, %.3f, %.3f, %.3f]", min, q25, q50, q75, max);
// return String.format("[%8.3f, %8.3f, %8.3f, %8.3f, %8.3f]", min, q25, q50, q75, max);
}

interface F {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.performance.compression;

import java.util.ArrayList;

import org.apache.sysds.performance.Util;
import org.apache.sysds.performance.Util.F;
import org.apache.sysds.performance.generators.IGenerate;

public abstract class APerfTest<T, G> {

/** The Result array that all the results of the individual executions is producing */
protected final ArrayList<T> ret;

/** A Task que that guarantee that the execution is not to long */
protected final IGenerate<G> gen;

/** Default Repetitions */
protected final int N;

protected APerfTest(int N, IGenerate<G> gen) {
ret = new ArrayList<T>(N);
this.gen = gen;
this.N = N;
}

protected void execute(F f, String name) throws InterruptedException {
warmup(f, 10);
gen.generate(N);
ret.clear();
double[] times = Util.time(f, N, gen);
String retS = makeResString(times);
System.out.println(String.format("%35s, %s, %10s", name, Util.stats(times), retS));
}

protected void warmup(F f, int n) throws InterruptedException {
gen.generate(N);
ret.clear();
}

protected void execute(F f, String name, int N) throws InterruptedException {
gen.generate(N);
ret.clear();
double[] times = Util.time(f, N, gen);
String retS = makeResString(times);
System.out.println(String.format("%35s, %s, %10s", name, Util.stats(times), retS));
}

protected abstract String makeResString();

protected String makeResString(double[] times){
return makeResString();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(String.format("%20s ", this.getClass().getSimpleName()));
sb.append(" Repetitions: ").append(N).append(" ");
sb.append(gen);
return sb.toString();
}
}
Loading

0 comments on commit abebe83

Please sign in to comment.