Skip to content

Commit

Permalink
fix parallel issue in spark
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 20, 2023
1 parent aaf7e78 commit 169c612
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;

import org.apache.hadoop.io.WritableComparable;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
Expand All @@ -31,7 +32,7 @@
* Write block for serializing either a instance of MatrixBlock or CompressedMatrixBlock, To allow spark to read in
* either or.
*/
public class CompressedWriteBlock implements WritableComparable<CompressedWriteBlock> {
public class CompressedWriteBlock implements WritableComparable<CompressedWriteBlock> , Serializable{

public MatrixBlock mb;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ public static JavaPairRDD<MatrixIndexes, MatrixBlock> getRDD(JavaSparkContext sc

final String dictName = fileName + ".dict";

JavaPairRDD<MatrixIndexes, CompressedWriteBlock> cmbRdd = sc.hadoopFile(fileName, SequenceFileInputFormat.class,
MatrixIndexes.class, CompressedWriteBlock.class);
JavaPairRDD<MatrixIndexes, MatrixBlock> cmbRdd = sc
.hadoopFile(fileName, SequenceFileInputFormat.class, MatrixIndexes.class, CompressedWriteBlock.class)
.mapValues(new CompressUnwrap());

if(HDFSTool.existsFileOnHDFS(dictName)) {

Expand All @@ -55,25 +56,22 @@ public static JavaPairRDD<MatrixIndexes, MatrixBlock> getRDD(JavaSparkContext sc
return combineRdds(cmbRdd, dictsRdd);
}
else {
return cmbRdd.mapValues(new CompressUnwrap());
return cmbRdd;
}

}

private static JavaPairRDD<MatrixIndexes, MatrixBlock> combineRdds(
JavaPairRDD<MatrixIndexes, CompressedWriteBlock> cmbRdd, JavaPairRDD<DictWritable.K, DictWritable> dictsRdd) {
private static JavaPairRDD<MatrixIndexes, MatrixBlock> combineRdds(JavaPairRDD<MatrixIndexes, MatrixBlock> cmbRdd,
JavaPairRDD<DictWritable.K, DictWritable> dictsRdd) {
// combine the elements
JavaPairRDD<MatrixIndexes, MatrixBlock> mbrdd = cmbRdd.mapValues(new CompressUnwrap());
JavaPairRDD<Integer, List<IDictionary>> dictsUnpacked = dictsRdd
.mapToPair((t) -> new Tuple2<>(Integer.valueOf(t._1.id + 1), t._2.dicts));
JavaPairRDD<Integer, Tuple2<MatrixIndexes, MatrixBlock>> mbrddC = mbrdd
.mapToPair((t) -> new Tuple2<>(Integer.valueOf((int) t._1.getColumnIndex()), t));
JavaPairRDD<Integer, Tuple2<MatrixIndexes, MatrixBlock>> mbrddC = cmbRdd
.mapToPair((t) -> new Tuple2<>(Integer.valueOf((int) t._1.getColumnIndex()),
new Tuple2<>(new MatrixIndexes(t._1), t._2)));

JavaPairRDD<Integer, Tuple2<Tuple2<MatrixIndexes, MatrixBlock>, List<IDictionary>>> j = mbrddC
.join(dictsUnpacked);
return mbrddC.join(dictsUnpacked).mapToPair(ReaderSparkCompressed::combineTuples);

JavaPairRDD<MatrixIndexes, MatrixBlock> ret = j.mapToPair(ReaderSparkCompressed::combineTuples);
return ret;
}

private static Tuple2<MatrixIndexes, MatrixBlock> combineTuples(
Expand All @@ -82,7 +80,7 @@ private static Tuple2<MatrixIndexes, MatrixBlock> combineTuples(
MatrixBlock mbIn = e._2._1._2;
List<IDictionary> dictsIn = e._2._2;
MatrixBlock ob = combineMatrixBlockAndDict(mbIn, dictsIn);
return new Tuple2<>(kOut, ob);
return new Tuple2<>(new MatrixIndexes(kOut), ob);
}

private static MatrixBlock combineMatrixBlockAndDict(MatrixBlock mb, List<IDictionary> dicts) {
Expand All @@ -100,8 +98,6 @@ private static MatrixBlock combineMatrixBlockAndDict(MatrixBlock mb, List<IDicti
}
}
else {
LOG.error(dicts.size());
LOG.error(gs.size());
int gis = 0;
for(int i = 0; i < gs.size(); i++) {
AColGroup g = gs.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@

public final class WriterCompressed extends MatrixWriter {

protected static final Log LOG = LogFactory.getLog(ReaderCompressed.class.getName());
protected static final Log LOG = LogFactory.getLog(WriterCompressed.class.getName());

private String fname;
private Future<Writer>[] writers;
Expand Down Expand Up @@ -235,7 +235,6 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi
final int j = i;
if(writers[i] == null) {
writers[i] = pool.submit(() -> {

return generateWriter(getPath(j));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ protected static void verifyEquivalence(MatrixBlock a, MatrixBlock b) {

public synchronized static MatrixBlock read(String path) {
try {
Thread.sleep(100);
return ReaderCompressed.readCompressedMatrixFromHDFS(path);
}
catch(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,26 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.io.ReaderCompressed;
import org.apache.sysds.runtime.compress.io.ReaderSparkCompressed;
import org.apache.sysds.runtime.compress.io.WriterCompressed;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysds.runtime.io.MatrixReader;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;

import scala.Tuple2;
Expand All @@ -58,11 +64,25 @@ public class IOSpark {
final static String nameBeginning = "src/test/java/org/apache/sysds/test/component/compress/io/files"
+ IOSpark.class.getSimpleName() + "/";

String before;

@AfterClass
public static void cleanup() {
IOCompressionTestUtils.deleteDirectory(new File(nameBeginning));
}

@After
public void after() {
ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, before);
IOCompressionTestUtils.deleteDirectory(new File(nameBeginning));
}

@Before
public void setup() {
before = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS);
ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, "2");
}

private static String getName() {
String name = IOCompressionTestUtils.getName(nameBeginning);
IOCompressionTestUtils.deleteDirectory(new File(name));
Expand Down Expand Up @@ -262,8 +282,10 @@ private void testWriteSparkReadCP(MatrixBlock mb, int blen1, int blen2) {
assertTrue(f.isFile() || f.isDirectory());
// Read in again as RDD
JavaPairRDD<MatrixIndexes, MatrixBlock> m = getRDD(f1);
MatrixReader r = ReaderCompressed.create();
MatrixBlock mb2 = r.readMatrixFromHDFS(f1, (long) mb.getNumRows(), (long) mb.getNumColumns(), blen1, -1L);
TestUtils.compareMatricesBitAvgDistance(mb, mb2, 0, 0);
String f2 = getName(); // get new name for writing RDD.

// Write RDD to disk
if(blen1 != blen2) {
DataCharacteristics mc = new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), blen1);
Expand All @@ -289,9 +311,7 @@ private void testReblock(MatrixBlock mb, int blen1, int blen2) {
Timing t = new Timing();

String f1 = getName();
Thread.sleep(100);
WriterCompressed.writeCompressedMatrixToHDFS(mb, f1, blen1);
Thread.sleep(100);

// Read in again as RDD
JavaPairRDD<MatrixIndexes, MatrixBlock> m = getRDD(f1); // Our starting point
Expand Down Expand Up @@ -336,10 +356,14 @@ private synchronized JavaPairRDD<MatrixIndexes, MatrixBlock> getRDD(String path)
@SuppressWarnings({"unchecked"})
public void readRDDThroughSparkExecutionContext(MatrixBlock mb, int blen) {
try {

String before = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS);
ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, "2");
String n = getName();

WriterCompressed.writeCompressedMatrixToHDFS(mb, n, blen);
MatrixReader r = ReaderCompressed.create();
MatrixBlock mb2 = r.readMatrixFromHDFS(n, (long) mb.getNumRows(), (long) mb.getNumColumns(), blen, -1L);
TestUtils.compareMatricesBitAvgDistance(mb, mb2, 0, 0);

SparkExecutionContext ec = ExecutionContextFactory.createSparkExecutionContext();

Expand All @@ -350,6 +374,7 @@ public void readRDDThroughSparkExecutionContext(MatrixBlock mb, int blen) {
.getRDDHandleForMatrixObject(obj, fmt);
List<Tuple2<MatrixIndexes, MatrixBlock>> c = m.collect();
verifySum(c, mb.sum(), 0.0001);
ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, before);
}
catch(Exception e) {
e.printStackTrace();
Expand Down

0 comments on commit 169c612

Please sign in to comment.