diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java b/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java index 1ccbec7dcda..06600da46b3 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import org.apache.hadoop.io.WritableComparable; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; @@ -31,7 +32,7 @@ * Write block for serializing either a instance of MatrixBlock or CompressedMatrixBlock, To allow spark to read in * either or. */ -public class CompressedWriteBlock implements WritableComparable { +public class CompressedWriteBlock implements WritableComparable , Serializable{ public MatrixBlock mb; diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderSparkCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderSparkCompressed.java index 428cd9a53fa..47c56d61855 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderSparkCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderSparkCompressed.java @@ -44,8 +44,9 @@ public static JavaPairRDD getRDD(JavaSparkContext sc final String dictName = fileName + ".dict"; - JavaPairRDD cmbRdd = sc.hadoopFile(fileName, SequenceFileInputFormat.class, - MatrixIndexes.class, CompressedWriteBlock.class); + JavaPairRDD cmbRdd = sc + .hadoopFile(fileName, SequenceFileInputFormat.class, MatrixIndexes.class, CompressedWriteBlock.class) + .mapValues(new CompressUnwrap()); if(HDFSTool.existsFileOnHDFS(dictName)) { @@ -55,25 +56,22 @@ public static JavaPairRDD getRDD(JavaSparkContext sc return combineRdds(cmbRdd, dictsRdd); } else { - return cmbRdd.mapValues(new CompressUnwrap()); + return cmbRdd; } } - private static JavaPairRDD combineRdds( - JavaPairRDD cmbRdd, JavaPairRDD dictsRdd) { + private static JavaPairRDD combineRdds(JavaPairRDD cmbRdd, + JavaPairRDD dictsRdd) { // combine the elements - JavaPairRDD mbrdd = cmbRdd.mapValues(new CompressUnwrap()); JavaPairRDD> dictsUnpacked = dictsRdd .mapToPair((t) -> new Tuple2<>(Integer.valueOf(t._1.id + 1), t._2.dicts)); - JavaPairRDD> mbrddC = mbrdd - .mapToPair((t) -> new Tuple2<>(Integer.valueOf((int) t._1.getColumnIndex()), t)); + JavaPairRDD> mbrddC = cmbRdd + .mapToPair((t) -> new Tuple2<>(Integer.valueOf((int) t._1.getColumnIndex()), + new Tuple2<>(new MatrixIndexes(t._1), t._2))); - JavaPairRDD, List>> j = mbrddC - .join(dictsUnpacked); + return mbrddC.join(dictsUnpacked).mapToPair(ReaderSparkCompressed::combineTuples); - JavaPairRDD ret = j.mapToPair(ReaderSparkCompressed::combineTuples); - return ret; } private static Tuple2 combineTuples( @@ -82,7 +80,7 @@ private static Tuple2 combineTuples( MatrixBlock mbIn = e._2._1._2; List dictsIn = e._2._2; MatrixBlock ob = combineMatrixBlockAndDict(mbIn, dictsIn); - return new Tuple2<>(kOut, ob); + return new Tuple2<>(new MatrixIndexes(kOut), ob); } private static MatrixBlock combineMatrixBlockAndDict(MatrixBlock mb, List dicts) { @@ -100,8 +98,6 @@ private static MatrixBlock combineMatrixBlockAndDict(MatrixBlock mb, List[] writers; @@ -235,7 +235,6 @@ private void writeMultiBlockCompressedParallel(MatrixBlock b, final int rlen, fi final int j = i; if(writers[i] == null) { writers[i] = pool.submit(() -> { - return generateWriter(getPath(j)); }); } diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java index e7e0d3b772d..ec147bc5dff 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java @@ -64,7 +64,6 @@ protected static void verifyEquivalence(MatrixBlock a, MatrixBlock b) { public synchronized static MatrixBlock read(String path) { try { - Thread.sleep(100); return ReaderCompressed.readCompressedMatrixFromHDFS(path); } catch(Exception e) { diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java index 27c33e35bed..4c128fdfd55 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java @@ -32,7 +32,10 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.io.ReaderCompressed; import org.apache.sysds.runtime.compress.io.ReaderSparkCompressed; import org.apache.sysds.runtime.compress.io.WriterCompressed; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; @@ -40,12 +43,15 @@ import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils; +import org.apache.sysds.runtime.io.MatrixReader; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.test.TestUtils; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.Test; import scala.Tuple2; @@ -58,11 +64,25 @@ public class IOSpark { final static String nameBeginning = "src/test/java/org/apache/sysds/test/component/compress/io/files" + IOSpark.class.getSimpleName() + "/"; + String before; + @AfterClass public static void cleanup() { IOCompressionTestUtils.deleteDirectory(new File(nameBeginning)); } + @After + public void after() { + ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, before); + IOCompressionTestUtils.deleteDirectory(new File(nameBeginning)); + } + + @Before + public void setup() { + before = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS); + ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, "2"); + } + private static String getName() { String name = IOCompressionTestUtils.getName(nameBeginning); IOCompressionTestUtils.deleteDirectory(new File(name)); @@ -262,8 +282,10 @@ private void testWriteSparkReadCP(MatrixBlock mb, int blen1, int blen2) { assertTrue(f.isFile() || f.isDirectory()); // Read in again as RDD JavaPairRDD m = getRDD(f1); + MatrixReader r = ReaderCompressed.create(); + MatrixBlock mb2 = r.readMatrixFromHDFS(f1, (long) mb.getNumRows(), (long) mb.getNumColumns(), blen1, -1L); + TestUtils.compareMatricesBitAvgDistance(mb, mb2, 0, 0); String f2 = getName(); // get new name for writing RDD. - // Write RDD to disk if(blen1 != blen2) { DataCharacteristics mc = new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), blen1); @@ -289,9 +311,7 @@ private void testReblock(MatrixBlock mb, int blen1, int blen2) { Timing t = new Timing(); String f1 = getName(); - Thread.sleep(100); WriterCompressed.writeCompressedMatrixToHDFS(mb, f1, blen1); - Thread.sleep(100); // Read in again as RDD JavaPairRDD m = getRDD(f1); // Our starting point @@ -336,10 +356,14 @@ private synchronized JavaPairRDD getRDD(String path) @SuppressWarnings({"unchecked"}) public void readRDDThroughSparkExecutionContext(MatrixBlock mb, int blen) { try { - + String before = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS); + ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, "2"); String n = getName(); WriterCompressed.writeCompressedMatrixToHDFS(mb, n, blen); + MatrixReader r = ReaderCompressed.create(); + MatrixBlock mb2 = r.readMatrixFromHDFS(n, (long) mb.getNumRows(), (long) mb.getNumColumns(), blen, -1L); + TestUtils.compareMatricesBitAvgDistance(mb, mb2, 0, 0); SparkExecutionContext ec = ExecutionContextFactory.createSparkExecutionContext(); @@ -350,6 +374,7 @@ public void readRDDThroughSparkExecutionContext(MatrixBlock mb, int blen) { .getRDDHandleForMatrixObject(obj, fmt); List> c = m.collect(); verifySum(c, mb.sum(), 0.0001); + ConfigurationManager.getDMLConfig().setTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS, before); } catch(Exception e) { e.printStackTrace();