From eb1a69718c9009eae2a6c292407bb8362ecbeaec Mon Sep 17 00:00:00 2001 From: Arnab Phani Date: Tue, 8 Aug 2023 09:26:32 +0200 Subject: [PATCH] [SYSTEMDS-2913] Cache GPU objects on second hit This patch updates the reuse logic of GPU objects to skip the first reference and cache on the second hit. This filters out many never-repeating intermediates, which in turns reduces GPU memory pressure, allocation and deallocation counts. Closes #1876 --- .../java/org/apache/sysds/lops/UnaryCP.java | 8 ++- .../sysds/runtime/lineage/LineageCache.java | 55 +++++++++++++++---- .../runtime/lineage/LineageCacheConfig.java | 3 +- .../async/LineageReuseSparkTest.java | 25 +++++---- .../functions/async/LineageReuseSpark7.dml | 31 +++++++++++ .../functions/lineage/LineageReuseGPU3.dml | 9 +-- 6 files changed, 101 insertions(+), 30 deletions(-) create mode 100644 src/test/scripts/functions/async/LineageReuseSpark7.dml diff --git a/src/main/java/org/apache/sysds/lops/UnaryCP.java b/src/main/java/org/apache/sysds/lops/UnaryCP.java index 7dd6a30e58b..e44c8968e7b 100644 --- a/src/main/java/org/apache/sysds/lops/UnaryCP.java +++ b/src/main/java/org/apache/sysds/lops/UnaryCP.java @@ -72,10 +72,12 @@ public String getOpCode() { @Override public String getInstructions(String input, String output) { - return InstructionUtils.concatOperands( + String ret = InstructionUtils.concatOperands( getExecType().name(), getOpCode(), getInputs().get(0).prepScalarInputOperand(getExecType()), - prepOutputOperand(output), - Integer.toString(_numThreads)); + prepOutputOperand(output)); + if (getExecType() == ExecType.CP || getExecType() == ExecType.FED) + ret = InstructionUtils.concatOperands(ret, Integer.toString(_numThreads)); + return ret; } } diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java index 5ef81d31339..57f909efd77 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java @@ -161,6 +161,7 @@ else if (e.isRDDPersist()) { //putValueRDD method will save the RDD and call persist e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD); //Cannot reuse rdd as already garbage collected + ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace return false; case PERSISTEDRDD: //Reuse the persisted intermediate at the executors @@ -180,6 +181,12 @@ else if (e.isRDDPersist()) { Pointer gpuPtr = e.getGPUPointer(); if (gpuPtr == null && e.getCacheStatus() == LineageCacheStatus.NOTCACHED) return false; //the executing thread removed this entry from cache + if (e.getCacheStatus() == LineageCacheStatus.TOCACHEGPU) { //second hit + //Cannot reuse as already garbage collected + ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace + return false; + } + //Reuse from third hit onwards (status == GPUCACHED) //Create a GPUObject with the cached pointer GPUObject gpuObj = new GPUObject(ec.getGPUContext(0), ec.getMatrixObject(outName), gpuPtr); @@ -295,9 +302,20 @@ else if (e.isScalarValue()) { if (reuse) { //Additional maintenance for GPU pointers and RDDs for (LineageCacheEntry e : funcOutLIs) { - if (e.isGPUObject()) - //Increment the live count for this pointer - LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer()); + if (e.isGPUObject()) { + switch(e.getCacheStatus()) { + case TOCACHEGPU: + //Cannot reuse as already garbage collected putValue method + // will save the pointer while caching the original instruction + return false; + case GPUCACHED: + //Increment the live count for this pointer + LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer()); + break; + default: + return false; + } + } else if (e.isRDDPersist()) { //Reuse the cached RDD (local or persisted at the executors) switch(e.getCacheStatus()) { @@ -598,7 +616,7 @@ public static void putValue(Instruction inst, ExecutionContext ec, long starttim } } else if (inst instanceof GPUInstruction) { - // TODO: gpu multiretrun instructions + // TODO: gpu multi-return instructions Data gpudata = ec.getVariable(((GPUInstruction) inst)._output); liGPUObj = gpudata instanceof MatrixObject ? ec.getMatrixObject(((GPUInstruction)inst)._output). @@ -708,14 +726,27 @@ private static void putValueGPU(GPUObject gpuObj, LineageItem instLI, long compu removePlaceholder(instLI); return; } - // Update the total size of lineage cached gpu objects - // The eviction is handled by the unified gpu memory manager - LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true); - // Set the GPUOject in the cache - centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(), - gpuObj.getMatrixObject().getMetaData(), computetime); - // Maintain order for eviction - LineageGPUCacheEviction.addEntry(centry); + switch(centry.getCacheStatus()) { + case EMPTY: //first hit + // Set the GPUOject in the cache. Will be garbage collected + centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(), + gpuObj.getMatrixObject().getMetaData(), computetime); + centry.setCacheStatus(LineageCacheStatus.TOCACHEGPU); + break; + case TOCACHEGPU: //second hit + // Update the total size of lineage cached gpu objects + // The eviction is handled by the unified gpu memory manager + LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true); + // Set the GPUOject in the cache and update the status + centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(), + gpuObj.getMatrixObject().getMetaData(), computetime); + centry.setCacheStatus(LineageCacheStatus.GPUCACHED); + // Maintain order for eviction + LineageGPUCacheEviction.addEntry(centry); + break; + default: + throw new DMLRuntimeException("Execution should not reach here: "+centry._key); + } } } diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java index 2971c36f164..09258201842 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java @@ -54,7 +54,7 @@ public class LineageCacheConfig "^", "uamax", "uark+", "uacmean", "eigen", "ctableexpand", "replace", "^2", "*2", "uack+", "tak+*", "uacsqk+", "uark+", "n+", "uarimax", "qsort", "qpick", "transformapply", "uarmax", "n+", "-*", "castdtm", "lowertri", - "prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*" + "prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*", "==" //TODO: Reuse everything. }; @@ -152,6 +152,7 @@ protected enum LineageCacheStatus { SPILLED, //Data is in disk. Empty value. Cannot be evicted. RELOADED, //Reloaded from disk. Can be evicted. PINNED, //Pinned to memory. Cannot be evicted. + TOCACHEGPU, //To be cached in GPU if the instruction reoccur GPUCACHED, //Points to GPU intermediate PERSISTEDRDD, //Persisted at the Spark executors TOPERSISTRDD, //To be persisted if the instruction reoccur diff --git a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java index f2cf0858387..3ace92b9949 100644 --- a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java +++ b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java @@ -30,6 +30,7 @@ import org.apache.sysds.runtime.lineage.Lineage; import org.apache.sysds.runtime.lineage.LineageCacheConfig; import org.apache.sysds.runtime.lineage.LineageCacheStatistics; +import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType; import org.apache.sysds.runtime.matrix.data.MatrixValue; import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; @@ -42,7 +43,7 @@ public class LineageReuseSparkTest extends AutomatedTestBase { protected static final String TEST_DIR = "functions/async/"; protected static final String TEST_NAME = "LineageReuseSpark"; - protected static final int TEST_VARIANTS = 6; + protected static final int TEST_VARIANTS = 7; protected static String TEST_CLASS_DIR = TEST_DIR + LineageReuseSparkTest.class.getSimpleName() + "/"; @Override @@ -54,44 +55,48 @@ public void setUp() { @Test public void testlmdsHB() { - runTest(TEST_NAME+"1", ExecMode.HYBRID, 1); + runTest(TEST_NAME+"1", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 1); } @Test public void testlmdsSP() { // Only reuse the actions - runTest(TEST_NAME+"1", ExecMode.SPARK, 1); + runTest(TEST_NAME+"1", ExecMode.SPARK, ReuseCacheType.REUSE_MULTILEVEL, 1); } @Test public void testlmdsRDD() { // Cache all RDDs and persist shuffle-based Spark operations (eg. rmm, cpmm) - runTest(TEST_NAME+"2", ExecMode.HYBRID, 2); + runTest(TEST_NAME+"2", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 2); } @Test public void testL2svm() { - runTest(TEST_NAME+"3", ExecMode.HYBRID, 3); + runTest(TEST_NAME+"3", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 3); } @Test public void testlmdsMultiLevel() { // Cache RDD and matrix block function returns and reuse - runTest(TEST_NAME+"4", ExecMode.HYBRID, 4); + runTest(TEST_NAME+"4", ExecMode.HYBRID, ReuseCacheType.REUSE_MULTILEVEL, 4); } @Test public void testEnsemble() { - runTest(TEST_NAME+"5", ExecMode.HYBRID, 5); + runTest(TEST_NAME+"5", ExecMode.HYBRID, ReuseCacheType.REUSE_MULTILEVEL, 5); } //FIXME: Collecting a persisted RDD still needs the broadcast vars. Debug. /*@Test public void testHyperband() { - runTest(TEST_NAME+"6", ExecMode.HYBRID, 6); + runTest(TEST_NAME+"6", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 6); + }*/ + /*@Test + public void testBroadcastBug() { + runTest(TEST_NAME+"7", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 7); }*/ - public void runTest(String testname, ExecMode execMode, int testId) { + public void runTest(String testname, ExecMode execMode, LineageCacheConfig.ReuseCacheType reuse, int testId) { boolean old_simplification = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION; boolean old_sum_product = OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES; boolean old_trans_exec_type = OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE; @@ -126,7 +131,7 @@ public void runTest(String testname, ExecMode execMode, int testId) { //proArgs.add("recompile_runtime"); proArgs.add("-stats"); proArgs.add("-lineage"); - proArgs.add(LineageCacheConfig.ReuseCacheType.REUSE_MULTILEVEL.name().toLowerCase()); + proArgs.add(reuse.name().toLowerCase()); proArgs.add("-args"); proArgs.add(output("R")); programArgs = proArgs.toArray(new String[proArgs.size()]); diff --git a/src/test/scripts/functions/async/LineageReuseSpark7.dml b/src/test/scripts/functions/async/LineageReuseSpark7.dml new file mode 100644 index 00000000000..a1c00881685 --- /dev/null +++ b/src/test/scripts/functions/async/LineageReuseSpark7.dml @@ -0,0 +1,31 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = rand(rows=10000, cols=200, seed=42); +y = rand(rows=10000, cols=1, seed=43); + +for (i in 1:3) { + s = t(X) %*% y; #mapmm single-block + Xd = X %*% s; #mapmm multi-block + out = 1 - y * Xd; #cp OP collects Xd + R = sum(out); +} +write(R, $1, format="text"); diff --git a/src/test/scripts/functions/lineage/LineageReuseGPU3.dml b/src/test/scripts/functions/lineage/LineageReuseGPU3.dml index be15ba20263..71674d426e6 100644 --- a/src/test/scripts/functions/lineage/LineageReuseGPU3.dml +++ b/src/test/scripts/functions/lineage/LineageReuseGPU3.dml @@ -19,8 +19,6 @@ # #------------------------------------------------------------- -# Increase rows and cols for better performance gains - SimLM = function(Matrix[Double] X, Matrix[Double] y, Double lamda=0.0001) return (Matrix[Double] beta) { A = t(X) %*% X + diag(matrix(lamda, rows=ncol(X), cols=1)); @@ -34,13 +32,16 @@ c = 10 X = rand(rows=r, cols=c, seed=42); y = rand(rows=r, cols=1, seed=43); -R = matrix(0, 1, 2); +R = matrix(0, 1, 3); beta1 = SimLM(X, y, 0.0001); R[,1] = sum(beta1); beta2 = SimLM(X, y, 0.0001); -R[,2] = sum(beta2); #function reuse +R[,2] = sum(beta2); #second hit + +beta3 = SimLM(X, y, 0.0001); +R[,3] = sum(beta3); #function reuse write(R, $1, format="text");