Skip to content

Commit

Permalink
[SYSTEMDS-2913] Cache GPU objects on second hit
Browse files Browse the repository at this point in the history
This patch updates the reuse logic of GPU objects to skip
the first reference and cache on the second hit. This filters
out many never-repeating intermediates, which in turns reduces
GPU memory pressure, allocation and deallocation counts.

Closes #1876
  • Loading branch information
phaniarnab committed Aug 8, 2023
1 parent 8033619 commit eb1a697
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 30 deletions.
8 changes: 5 additions & 3 deletions src/main/java/org/apache/sysds/lops/UnaryCP.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ public String getOpCode() {

@Override
public String getInstructions(String input, String output) {
return InstructionUtils.concatOperands(
String ret = InstructionUtils.concatOperands(
getExecType().name(), getOpCode(),
getInputs().get(0).prepScalarInputOperand(getExecType()),
prepOutputOperand(output),
Integer.toString(_numThreads));
prepOutputOperand(output));
if (getExecType() == ExecType.CP || getExecType() == ExecType.FED)
ret = InstructionUtils.concatOperands(ret, Integer.toString(_numThreads));
return ret;
}
}
55 changes: 43 additions & 12 deletions src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ else if (e.isRDDPersist()) {
//putValueRDD method will save the RDD and call persist
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
//Cannot reuse rdd as already garbage collected
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
return false;
case PERSISTEDRDD:
//Reuse the persisted intermediate at the executors
Expand All @@ -180,6 +181,12 @@ else if (e.isRDDPersist()) {
Pointer gpuPtr = e.getGPUPointer();
if (gpuPtr == null && e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the executing thread removed this entry from cache
if (e.getCacheStatus() == LineageCacheStatus.TOCACHEGPU) { //second hit
//Cannot reuse as already garbage collected
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
return false;
}
//Reuse from third hit onwards (status == GPUCACHED)
//Create a GPUObject with the cached pointer
GPUObject gpuObj = new GPUObject(ec.getGPUContext(0),
ec.getMatrixObject(outName), gpuPtr);
Expand Down Expand Up @@ -295,9 +302,20 @@ else if (e.isScalarValue()) {
if (reuse) {
//Additional maintenance for GPU pointers and RDDs
for (LineageCacheEntry e : funcOutLIs) {
if (e.isGPUObject())
//Increment the live count for this pointer
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
if (e.isGPUObject()) {
switch(e.getCacheStatus()) {
case TOCACHEGPU:
//Cannot reuse as already garbage collected putValue method
// will save the pointer while caching the original instruction
return false;
case GPUCACHED:
//Increment the live count for this pointer
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
break;
default:
return false;
}
}
else if (e.isRDDPersist()) {
//Reuse the cached RDD (local or persisted at the executors)
switch(e.getCacheStatus()) {
Expand Down Expand Up @@ -598,7 +616,7 @@ public static void putValue(Instruction inst, ExecutionContext ec, long starttim
}
}
else if (inst instanceof GPUInstruction) {
// TODO: gpu multiretrun instructions
// TODO: gpu multi-return instructions
Data gpudata = ec.getVariable(((GPUInstruction) inst)._output);
liGPUObj = gpudata instanceof MatrixObject ?
ec.getMatrixObject(((GPUInstruction)inst)._output).
Expand Down Expand Up @@ -708,14 +726,27 @@ private static void putValueGPU(GPUObject gpuObj, LineageItem instLI, long compu
removePlaceholder(instLI);
return;
}
// Update the total size of lineage cached gpu objects
// The eviction is handled by the unified gpu memory manager
LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true);
// Set the GPUOject in the cache
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
gpuObj.getMatrixObject().getMetaData(), computetime);
// Maintain order for eviction
LineageGPUCacheEviction.addEntry(centry);
switch(centry.getCacheStatus()) {
case EMPTY: //first hit
// Set the GPUOject in the cache. Will be garbage collected
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
gpuObj.getMatrixObject().getMetaData(), computetime);
centry.setCacheStatus(LineageCacheStatus.TOCACHEGPU);
break;
case TOCACHEGPU: //second hit
// Update the total size of lineage cached gpu objects
// The eviction is handled by the unified gpu memory manager
LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true);
// Set the GPUOject in the cache and update the status
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
gpuObj.getMatrixObject().getMetaData(), computetime);
centry.setCacheStatus(LineageCacheStatus.GPUCACHED);
// Maintain order for eviction
LineageGPUCacheEviction.addEntry(centry);
break;
default:
throw new DMLRuntimeException("Execution should not reach here: "+centry._key);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class LineageCacheConfig
"^", "uamax", "uark+", "uacmean", "eigen", "ctableexpand", "replace",
"^2", "*2", "uack+", "tak+*", "uacsqk+", "uark+", "n+", "uarimax", "qsort",
"qpick", "transformapply", "uarmax", "n+", "-*", "castdtm", "lowertri",
"prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*"
"prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*", "=="
//TODO: Reuse everything.
};

Expand Down Expand Up @@ -152,6 +152,7 @@ protected enum LineageCacheStatus {
SPILLED, //Data is in disk. Empty value. Cannot be evicted.
RELOADED, //Reloaded from disk. Can be evicted.
PINNED, //Pinned to memory. Cannot be evicted.
TOCACHEGPU, //To be cached in GPU if the instruction reoccur
GPUCACHED, //Points to GPU intermediate
PERSISTEDRDD, //Persisted at the Spark executors
TOPERSISTRDD, //To be persisted if the instruction reoccur
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.matrix.data.MatrixValue;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
Expand All @@ -42,7 +43,7 @@ public class LineageReuseSparkTest extends AutomatedTestBase {

protected static final String TEST_DIR = "functions/async/";
protected static final String TEST_NAME = "LineageReuseSpark";
protected static final int TEST_VARIANTS = 6;
protected static final int TEST_VARIANTS = 7;
protected static String TEST_CLASS_DIR = TEST_DIR + LineageReuseSparkTest.class.getSimpleName() + "/";

@Override
Expand All @@ -54,44 +55,48 @@ public void setUp() {

@Test
public void testlmdsHB() {
runTest(TEST_NAME+"1", ExecMode.HYBRID, 1);
runTest(TEST_NAME+"1", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 1);
}

@Test
public void testlmdsSP() {
// Only reuse the actions
runTest(TEST_NAME+"1", ExecMode.SPARK, 1);
runTest(TEST_NAME+"1", ExecMode.SPARK, ReuseCacheType.REUSE_MULTILEVEL, 1);
}

@Test
public void testlmdsRDD() {
// Cache all RDDs and persist shuffle-based Spark operations (eg. rmm, cpmm)
runTest(TEST_NAME+"2", ExecMode.HYBRID, 2);
runTest(TEST_NAME+"2", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 2);
}

@Test
public void testL2svm() {
runTest(TEST_NAME+"3", ExecMode.HYBRID, 3);
runTest(TEST_NAME+"3", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 3);
}

@Test
public void testlmdsMultiLevel() {
// Cache RDD and matrix block function returns and reuse
runTest(TEST_NAME+"4", ExecMode.HYBRID, 4);
runTest(TEST_NAME+"4", ExecMode.HYBRID, ReuseCacheType.REUSE_MULTILEVEL, 4);
}

@Test
public void testEnsemble() {
runTest(TEST_NAME+"5", ExecMode.HYBRID, 5);
runTest(TEST_NAME+"5", ExecMode.HYBRID, ReuseCacheType.REUSE_MULTILEVEL, 5);
}

//FIXME: Collecting a persisted RDD still needs the broadcast vars. Debug.
/*@Test
public void testHyperband() {
runTest(TEST_NAME+"6", ExecMode.HYBRID, 6);
runTest(TEST_NAME+"6", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 6);
}*/
/*@Test
public void testBroadcastBug() {
runTest(TEST_NAME+"7", ExecMode.HYBRID, ReuseCacheType.REUSE_FULL, 7);
}*/

public void runTest(String testname, ExecMode execMode, int testId) {
public void runTest(String testname, ExecMode execMode, LineageCacheConfig.ReuseCacheType reuse, int testId) {
boolean old_simplification = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
boolean old_sum_product = OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
boolean old_trans_exec_type = OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;
Expand Down Expand Up @@ -126,7 +131,7 @@ public void runTest(String testname, ExecMode execMode, int testId) {
//proArgs.add("recompile_runtime");
proArgs.add("-stats");
proArgs.add("-lineage");
proArgs.add(LineageCacheConfig.ReuseCacheType.REUSE_MULTILEVEL.name().toLowerCase());
proArgs.add(reuse.name().toLowerCase());
proArgs.add("-args");
proArgs.add(output("R"));
programArgs = proArgs.toArray(new String[proArgs.size()]);
Expand Down
31 changes: 31 additions & 0 deletions src/test/scripts/functions/async/LineageReuseSpark7.dml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------

X = rand(rows=10000, cols=200, seed=42);
y = rand(rows=10000, cols=1, seed=43);

for (i in 1:3) {
s = t(X) %*% y; #mapmm single-block
Xd = X %*% s; #mapmm multi-block
out = 1 - y * Xd; #cp OP collects Xd
R = sum(out);
}
write(R, $1, format="text");
9 changes: 5 additions & 4 deletions src/test/scripts/functions/lineage/LineageReuseGPU3.dml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#
#-------------------------------------------------------------

# Increase rows and cols for better performance gains

SimLM = function(Matrix[Double] X, Matrix[Double] y, Double lamda=0.0001) return (Matrix[Double] beta)
{
A = t(X) %*% X + diag(matrix(lamda, rows=ncol(X), cols=1));
Expand All @@ -34,13 +32,16 @@ c = 10

X = rand(rows=r, cols=c, seed=42);
y = rand(rows=r, cols=1, seed=43);
R = matrix(0, 1, 2);
R = matrix(0, 1, 3);

beta1 = SimLM(X, y, 0.0001);
R[,1] = sum(beta1);

beta2 = SimLM(X, y, 0.0001);
R[,2] = sum(beta2); #function reuse
R[,2] = sum(beta2); #second hit

beta3 = SimLM(X, y, 0.0001);
R[,3] = sum(beta3); #function reuse

write(R, $1, format="text");

0 comments on commit eb1a697

Please sign in to comment.