Skip to content

Commit

Permalink
cocode que shortcut
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 2, 2023
1 parent 5c86f28 commit 76345bd
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,32 +117,46 @@ private static List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSiz
CompressedSizeInfoColGroup l = null;
l = que.poll();
int groupNr = ret.size() + que.size();
while(que.peek() != null && groupNr >= minNumGroups) {
int lastCombine = 0; // if we have not combined in the last 5 tries abort cocoding.

while(que.peek() != null && groupNr >= minNumGroups && lastCombine < 5) {
CompressedSizeInfoColGroup r = que.peek();
CompressedSizeInfoColGroup g = sEst.combine(l, r);

// LOG.error(getCost(g, cEst) + " " + g.getColumns());
if(g != null) {
double costOfJoin = cEst.getCost(g);
double costIndividual = cEst.getCost(l) + cEst.getCost(r);

if(costOfJoin < costIndividual) {
que.poll();
int numColumns = g.getColumns().size();
if(numColumns > 128)
if(numColumns > 128){
lastCombine++;
ret.add(g);
else
}
else{
lastCombine = 0;
que.add(g);
}
}
else
else{
lastCombine++;
ret.add(l);
}
}
else
else{
lastCombine++;
ret.add(l);
}

l = que.poll();
groupNr = ret.size() + que.size();
}
while(que.peek() != null){
// empty que
ret.add(l);
l = que.poll();
}

if(l != null)
ret.add(l);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types.CorrectionLocationType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.codegen.SpoofOperator.SideInput;
import org.apache.sysds.runtime.codegen.SpoofOperator.SideInputSparseCell;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.DenseBlockFactory;
import org.apache.sysds.runtime.data.SparseBlock;
Expand Down Expand Up @@ -87,7 +88,7 @@
* TODO next opcode extensions: a+, colindexmax
*/
public class LibMatrixAgg {
// private static final Log LOG = LogFactory.getLog(LibMatrixAgg.class.getName());
protected static final Log LOG = LogFactory.getLog(LibMatrixAgg.class.getName());

//internal configuration parameters
private static final boolean NAN_AWARENESS = false;
Expand Down Expand Up @@ -630,14 +631,14 @@ public static boolean isSupportedUnaryOperator( UnaryOperator op ) {
}

public static boolean satisfiesMultiThreadingConstraints(MatrixBlock in, MatrixBlock out, AggregateUnaryOperator uaop, int k) {
boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k);
boolean sharedTP = CommonThreadPool.isSharedTPThreads(k);
return k > 1 && out.isThreadSafe() && in.rlen > (sharedTP ? k/8 : k/2)
&& (uaop.indexFn instanceof ReduceCol || out.clen*8*k < PAR_INTERMEDIATE_SIZE_THRESHOLD) //size
&& in.nonZeros > (sharedTP ? PAR_NUMCELL_THRESHOLD2 : PAR_NUMCELL_THRESHOLD1);
}

public static boolean satisfiesMultiThreadingConstraints(MatrixBlock in,int k) {
boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k);
public static boolean satisfiesMultiThreadingConstraints(MatrixBlock in, int k) {
boolean sharedTP = CommonThreadPool.isSharedTPThreads(k);
return k > 1 && in.rlen > (sharedTP ? k/8 : k/2)
&& in.nonZeros > (sharedTP ? PAR_NUMCELL_THRESHOLD2 : PAR_NUMCELL_THRESHOLD1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ else if(shared2 == null){
}
}

public static boolean isSharedTPThreads(int k){
return InfrastructureAnalyzer.getLocalParallelism() == k || shared2K ==k || shared2K == -1;
}

public static <T> void invokeAndShutdown(ExecutorService pool, Collection<? extends Callable<T>> tasks) {
try {
// execute tasks
Expand Down

0 comments on commit 76345bd

Please sign in to comment.