Skip to content

Commit

Permalink
Slice inside task
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 15, 2023
1 parent 2727903 commit f5af021
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,12 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi
final CompressedMatrixBlock rmc = new CompressedMatrixBlock(mc.getNumRows(), mc.getNumColumns(),
mc.getNonZeros(), false, s.indexStructures);
// slice out row blocks in this.
List<MatrixBlock> blocks = CLALibSlice.sliceBlocks(rmc, blen, 1); // Slice compressed blocks
final int blocksPerThread = Math.max(1, blocks.size() / k);
for(int block = 0; block < blocks.size(); block += blocksPerThread) {
WriteTask we = new WriteTask(i++ % k, blocks, bc, block,
Math.min(blocks.size(), block + blocksPerThread));
// List<MatrixBlock> blocks = CLALibSlice.sliceBlocks(rmc, blen, 1); // Slice compressed blocks
final int nBlocks = (int) Math.ceil((double) rlen / blen);
final int blocksPerThread = Math.max(1, nBlocks / k);
for(int block = 0; block < nBlocks; block += blocksPerThread) {
WriteTask we = new WriteTask(i++ % k, rmc, bc, block, Math.min(nBlocks, block + blocksPerThread),
blen);
tasks.add(we);
}
tasks.add(new DictWriteTask(fname, s.dicts, bc));
Expand Down Expand Up @@ -336,28 +337,32 @@ private static void cleanup(Path path) throws IOException {

private class WriteTask implements Callable<Object> {
final int id;
final List<MatrixBlock> blocks;
final CompressedMatrixBlock rmc;
final int bc;
final int bl;
final int bu;
final int blen;

private WriteTask(int id, List<MatrixBlock> blocks, int bc, int bl, int bu) {
private WriteTask(int id, CompressedMatrixBlock rmc, int bc, int bl, int bu, int blen) {
this.id = id;
this.blocks = blocks;
this.rmc = rmc;
// +1 for one indexed
this.bl = bl + 1;
this.bu = bu + 1;
this.bc = bc + 1;
this.blen = blen;
}

@Override
public Object call() throws Exception {
writerLocks[id].lock();
try {
Writer w = writers[id].get();
final int nrow = rmc.getNumRows();
for(int b = bl; b < bu; b++) {
MatrixIndexes index = new MatrixIndexes(b, bc);
CompressedWriteBlock blk = new CompressedWriteBlock(blocks.get(b - 1));
MatrixBlock cb = CLALibSlice.sliceRowsCompressed(rmc, (b - 1) * blen, Math.min(b * blen, nrow) - 1);
CompressedWriteBlock blk = new CompressedWriteBlock(cb);
w.append(index, blk);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private static MatrixBlock sliceRowsDecompress(CompressedMatrixBlock cmb, int rl
return tmp;
}

private static MatrixBlock sliceRowsCompressed(CompressedMatrixBlock cmb, int rl, int ru) {
public static MatrixBlock sliceRowsCompressed(CompressedMatrixBlock cmb, int rl, int ru) {
final List<AColGroup> groups = cmb.getColGroups();
final List<AColGroup> newColGroups = new ArrayList<>(groups.size());
final List<IColIndex> emptyGroups = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.sysds.runtime.compress.utils;

import java.util.Arrays;

import org.apache.sysds.runtime.compress.utils.ACount.DCounts;

public class DoubleCountHashMap extends ACountHashMap<Double> {
Expand Down

0 comments on commit f5af021

Please sign in to comment.