Skip to content

Commit

Permalink
[SYSTEMDS-???] SchemaApplyPerfTests
Browse files Browse the repository at this point in the history
Add Two Range Index

SDC Scheme

Write test

parallelization degree on the unary aggregate sum on MatrixBlock object

cocode que shortcut

Split balanced

compress matrices with unknown number of NNz

reb
  • Loading branch information
Baunsgaard committed Aug 8, 2023
1 parent e045582 commit 965aaa7
Show file tree
Hide file tree
Showing 74 changed files with 3,599 additions and 527 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@
<mainClass>org.apache.sysds.performance.Main</mainClass>
</manifest>
<manifestEntries>
<Class-Path>SystemDS.jar SystemDS-tests.jar</Class-Path>
<Class-Path>SystemDS.jar ${project.build.directory}/${project.artifactId}-${project.version}-tests.jar</Class-Path>
</manifestEntries>
</archive>
</configuration>
Expand Down
9 changes: 5 additions & 4 deletions scripts/builtin/splitBalanced.dml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ return (Matrix[Double] X_train, Matrix[Double] y_train, Matrix[Double] X_test,
Matrix[Double] y_test)
{

classes = table(Y, 1)
XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE)
# get the class count
classes = table(XY[, 1], 1)
split = floor(nrow(X) * splitRatio)
start_class = 1
train_row_s = 1
Expand All @@ -70,13 +70,14 @@ return (Matrix[Double] X_train, Matrix[Double] y_train, Matrix[Double] X_test,
{
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
ratio = as.scalar(classes_ratio_train[i])

train_row_e = train_row_e + as.scalar(classes_ratio_train[i])
train_row_e = train_row_e + ratio
test_row_e = test_row_e + as.scalar(classes_ratio_test[i])

outTrain[train_row_s:train_row_e, ] = class_t[1:as.scalar(classes_ratio_train[i]), ]
outTrain[train_row_s:train_row_e, ] = class_t[1:ratio, ]

outTest[test_row_s:test_row_e, ] = class_t[as.scalar(classes_ratio_train[i])+1:nrow(class_t), ]
outTest[test_row_s:test_row_e, ] = class_t[ratio+1:nrow(class_t), ]

train_row_s = train_row_e + 1
test_row_s = test_row_e + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.functionobjects.SwapIndex;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CM_COV_Object;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
Expand All @@ -77,7 +78,6 @@
import org.apache.sysds.runtime.matrix.data.RandomMatrixGenerator;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.matrix.operators.CMOperator;
Expand Down Expand Up @@ -587,11 +587,21 @@ else if(isOverlapping()) {

@Override
public MatrixBlock reorgOperations(ReorgOperator op, MatrixValue ret, int startRow, int startColumn, int length) {
// Allow transpose to be compressed output. In general we need to have a transposed flag on
// the compressed matrix. https://issues.apache.org/jira/browse/SYSTEMDS-3025
printDecompressWarning(op.getClass().getSimpleName() + " -- " + op.fn.getClass().getSimpleName());
MatrixBlock tmp = decompress(op.getNumThreads());
return tmp.reorgOperations(op, ret, startRow, startColumn, length);
if(op.fn instanceof SwapIndex && this.getNumColumns() == 1) {
MatrixBlock tmp = decompress(op.getNumThreads());
long nz = tmp.setNonZeros(tmp.getNonZeros());
tmp = new MatrixBlock(tmp.getNumColumns(), tmp.getNumRows(), tmp.getDenseBlockValues());
tmp.setNonZeros(nz);
return tmp;
}
else {
// Allow transpose to be compressed output. In general we need to have a transposed flag on
// the compressed matrix. https://issues.apache.org/jira/browse/SYSTEMDS-3025
String message = op.getClass().getSimpleName() + " -- " + op.fn.getClass().getSimpleName();
MatrixBlock tmp = getUncompressed(message, op.getNumThreads());
return tmp.reorgOperations(op, ret, startRow, startColumn, length);
}

}

public boolean isOverlapping() {
Expand Down Expand Up @@ -788,24 +798,6 @@ public MatrixBlock sortOperations(MatrixValue weights, MatrixBlock result) {
return getUncompressed("sortOperations").sortOperations(right, result);
}

@Override
public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, MatrixBlock ret,
AggregateTernaryOperator op, boolean inCP) {
boolean m1C = m1 instanceof CompressedMatrixBlock;
boolean m2C = m2 instanceof CompressedMatrixBlock;
boolean m3C = m3 instanceof CompressedMatrixBlock;
printDecompressWarning("aggregateTernaryOperations " + op.aggOp.getClass().getSimpleName() + " "
+ op.indexFn.getClass().getSimpleName() + " " + op.aggOp.increOp.fn.getClass().getSimpleName() + " "
+ op.binaryFn.getClass().getSimpleName() + " m1,m2,m3 " + m1C + " " + m2C + " " + m3C);
MatrixBlock left = getUncompressed(m1);
MatrixBlock right1 = getUncompressed(m2);
MatrixBlock right2 = getUncompressed(m3);
ret = left.aggregateTernaryOperations(left, right1, right2, ret, op, inCP);
if(ret.getNumRows() == 0 || ret.getNumColumns() == 0)
throw new DMLCompressionException("Invalid output");
return ret;
}

@Override
public MatrixBlock uaggouterchainOperations(MatrixBlock mbLeft, MatrixBlock mbRight, MatrixBlock mbOut,
BinaryOperator bOp, AggregateUnaryOperator uaggOp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@ public static CompressedMatrixBlock createConstant(int numRows, int numCols, dou
}

private Pair<MatrixBlock, CompressionStatistics> compressMatrix() {
if(mb.getNonZeros() < 0)
throw new DMLCompressionException("Invalid to compress matrices with unknown nonZeros");
if(mb.getNonZeros() < 0) {
LOG.warn("Recomputing non-zeros since it is unknown in compression");
mb.recomputeNonZeros();
}
else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOverlapping()) {
LOG.warn("Unsupported recompression of overlapping compression");
return new ImmutablePair<>(mb, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {

private static final int COL_COMBINE_THRESHOLD = 1024;

protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator,
CompressionSettings cs) {
protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) {
super(sizeEstimator, costEstimator, cs);
}

Expand All @@ -59,8 +58,8 @@ protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) {
return colInfos;
}

protected static List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> groups,
AComEst sEst, ACostEstimate cEst, int minNumGroups, int k) {
protected static List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> groups, AComEst sEst,
ACostEstimate cEst, int minNumGroups, int k) {

if(groups.size() > COL_COMBINE_THRESHOLD && k > 1)
return combineMultiThreaded(groups, sEst, cEst, minNumGroups, k);
Expand Down Expand Up @@ -111,37 +110,53 @@ private static List<CompressedSizeInfoColGroup> combineBlock(List<CompressedSize
return combineBlock(que, sEst, cEst, minNumGroups);
}

private static List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSizeInfoColGroup> que,
AComEst sEst, ACostEstimate cEst, int minNumGroups) {
private static List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSizeInfoColGroup> que, AComEst sEst,
ACostEstimate cEst, int minNumGroups) {

List<CompressedSizeInfoColGroup> ret = new ArrayList<>();
CompressedSizeInfoColGroup l = null;
l = que.poll();
int groupNr = ret.size() + que.size();
while(que.peek() != null && groupNr >= minNumGroups) {
int lastCombine = 0; // if we have not combined in the last 5 tries abort cocoding.

while(que.peek() != null && groupNr >= minNumGroups && lastCombine < 5) {
CompressedSizeInfoColGroup r = que.peek();
CompressedSizeInfoColGroup g = sEst.combine(l, r);

if(g != null) {
double costOfJoin = cEst.getCost(g);
double costIndividual = cEst.getCost(l) + cEst.getCost(r);

if(costOfJoin < costIndividual) {
que.poll();
int numColumns = g.getColumns().size();
if(numColumns > 128)
if(numColumns > 128){
lastCombine++;
ret.add(g);
else
}
else{
lastCombine = 0;
que.add(g);
}
}
else
else{
lastCombine++;
ret.add(l);
}
}
else
else{
lastCombine++;
ret.add(l);
}

l = que.poll();
groupNr = ret.size() + que.size();
}
while(que.peek() != null){
// empty que
ret.add(l);
l = que.poll();
}

if(l != null)
ret.add(l);
Expand All @@ -153,11 +168,15 @@ private static List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSiz
}

private static Queue<CompressedSizeInfoColGroup> getQue(int size, ACostEstimate cEst) {
Comparator<CompressedSizeInfoColGroup> comp = Comparator.comparing(x -> cEst.getCost(x));
Comparator<CompressedSizeInfoColGroup> comp = Comparator.comparing(x -> getCost(x, cEst));
Queue<CompressedSizeInfoColGroup> que = new PriorityQueue<>(size, comp);
return que;
}

private static double getCost(CompressedSizeInfoColGroup x, ACostEstimate cEst) {
return cEst.getCost(x) + x.getColumns().avgOfIndex() / 100000;
}

protected static class PQTask implements Callable<List<CompressedSizeInfoColGroup>> {

private final List<CompressedSizeInfoColGroup> _groups;
Expand All @@ -167,8 +186,8 @@ protected static class PQTask implements Callable<List<CompressedSizeInfoColGrou
private final ACostEstimate _cEst;
private final int _minNumGroups;

protected PQTask(List<CompressedSizeInfoColGroup> groups, int start, int end, AComEst sEst,
ACostEstimate cEst, int minNumGroups) {
protected PQTask(List<CompressedSizeInfoColGroup> groups, int start, int end, AComEst sEst, ACostEstimate cEst,
int minNumGroups) {
_groups = groups;
_start = start;
_end = end;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.compress.cost.ACostEstimate;
import org.apache.sysds.runtime.compress.estim.AComEst;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.utils.IntArrayList;

public interface CoCoderFactory {
public static final Log LOG = LogFactory.getLog(AColumnCoCoder.class.getName());

/**
* The Valid coCoding techniques
Expand All @@ -53,51 +56,69 @@ public enum PartitionerType {
* @param cs The compression settings used in the compression.
* @return The estimated (hopefully) best groups of ColGroups.
*/
public static CompressedSizeInfo findCoCodesByPartitioning(AComEst est, CompressedSizeInfo colInfos,
int k, ACostEstimate costEstimator, CompressionSettings cs) {
public static CompressedSizeInfo findCoCodesByPartitioning(AComEst est, CompressedSizeInfo colInfos, int k,
ACostEstimate costEstimator, CompressionSettings cs) {

// Use column group partitioner to create partitions of columns
AColumnCoCoder co = createColumnGroupPartitioner(cs.columnPartitioner, est, costEstimator, cs);

// Find out if any of the groups are empty.
boolean containsEmpty = false;
for(CompressedSizeInfoColGroup g : colInfos.compressionInfo) {
if(g.isEmpty()) {
containsEmpty = true;
break;
}
}
final boolean containsEmptyOrConst = containsEmptyOrConst(colInfos);

// if there are no empty columns then try cocode algorithms for all columns
if(!containsEmpty)
// if there are no empty or const columns then try cocode algorithms for all columns
if(!containsEmptyOrConst)
return co.coCodeColumns(colInfos, k);
else {
// filtered empty groups
final List<IColIndex> emptyCols = new ArrayList<>();
// filtered const groups
final List<IColIndex> constCols = new ArrayList<>();
// filtered groups -- in the end starting with all groups
final List<CompressedSizeInfoColGroup> groups = new ArrayList<>();

final int nRow = colInfos.compressionInfo.get(0).getNumRows();

// filter groups
for(int i = 0; i < colInfos.compressionInfo.size(); i++) {
CompressedSizeInfoColGroup g = colInfos.compressionInfo.get(i);
if(g.isEmpty())
emptyCols.add(g.getColumns());
else if(g.isConst())
constCols.add(g.getColumns());
else
groups.add(g);
}

// extract all empty columns
IntArrayList emptyCols = new IntArrayList();
List<CompressedSizeInfoColGroup> notEmpty = new ArrayList<>();
// overwrite groups.
colInfos.compressionInfo = groups;

// cocode remaining groups
if(!groups.isEmpty()) {
colInfos = co.coCodeColumns(colInfos, k);
}

for(CompressedSizeInfoColGroup g : colInfos.compressionInfo) {
if(g.isEmpty())
emptyCols.appendValue(g.getColumns().get(0));
else
notEmpty.add(g);
}
// add empty
if(emptyCols.size() > 0) {
final IColIndex idx = ColIndexFactory.combineIndexes(emptyCols);
colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.EMPTY));
}

final int nRow = colInfos.compressionInfo.get(0).getNumRows();
// add const
if(constCols.size() > 0) {
final IColIndex idx = ColIndexFactory.combineIndexes(constCols);
colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.CONST));
}

final IColIndex idx = ColIndexFactory.create(emptyCols);
if(notEmpty.isEmpty()) { // if all empty (unlikely but could happen)
CompressedSizeInfoColGroup empty = new CompressedSizeInfoColGroup(idx, nRow);
return new CompressedSizeInfo(empty);
}
return colInfos;

// cocode all not empty columns
colInfos.compressionInfo = notEmpty;
colInfos = co.coCodeColumns(colInfos, k);
}
}

// add empty columns back as single columns
colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow));
return colInfos;
private static boolean containsEmptyOrConst(CompressedSizeInfo colInfos) {
for(CompressedSizeInfoColGroup g : colInfos.compressionInfo)
if(g.isEmpty() || g.isConst())
return true;
return false;
}

private static AColumnCoCoder createColumnGroupPartitioner(PartitionerType type, AComEst est,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public boolean contains(ColIndexes a, ColIndexes b) {

if(a == null || b == null)
return false;
int id = _indexes.findIndex(a._indexes.get(0));
if(id >= 0)
return true;
id = _indexes.findIndex(b._indexes.get(0));
return id >= 0;
return _indexes.contains(a._indexes.get(0)) || _indexes.contains(b._indexes.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
import org.apache.sysds.runtime.compress.colgroup.scheme.SDCScheme;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.estim.EstimationFactors;

Expand Down Expand Up @@ -62,4 +64,9 @@ public final CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
EstimationFactors ef = new EstimationFactors(getNumValues(), _numRows, getNumberOffsets(), _dict.getSparsity());
return new CompressedSizeInfoColGroup(_colIndexes, ef, nRow, getCompType(),getEncoding());
}

@Override
public ICLAScheme getCompressionScheme() {
return SDCScheme.create(this);
}
}
Loading

0 comments on commit 965aaa7

Please sign in to comment.