Skip to content

Commit

Permalink
[MINOR] Bug fixes in reuse in Spark
Browse files Browse the repository at this point in the history
This patch fixes multiple bugs in the intersection of lineage
tracing, caching and Spark operations. This patch also adds
a new test for top-k cleaning with a real dataset.

Closes #1891
  • Loading branch information
phaniarnab committed Aug 30, 2023
1 parent f094eac commit 5903419
Show file tree
Hide file tree
Showing 16 changed files with 8,660 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void execute(ExecutionContext ec)
//statement-block-level, lineage-based reuse
LineageItem[] liInputs = null;
long t0 = 0;
if (_sb != null && LineageCacheConfig.isMultiLevelReuse() && !_sb.isNondeterministic()) {
boolean benefitFromReuse = LineageItemUtils.hasValidInsts(tmp); //large SB or has Spark instructions
if (_sb != null && LineageCacheConfig.isMultiLevelReuse() && !_sb.isNondeterministic() && benefitFromReuse) {
liInputs = LineageItemUtils.getLineageItemInputstoSB(_sb.getInputstoSB(), ec);
List<String> outNames = _sb.getOutputNamesofSB();
if(liInputs != null && LineageCache.reuse(outNames, _sb.getOutputsofSB(),
Expand All @@ -126,8 +127,8 @@ public void execute(ExecutionContext ec)
executeInstructions(tmp, ec);

//statement-block-level, lineage-based caching
if (_sb != null && liInputs != null && !_sb.isNondeterministic())
LineageCache.putValue(_sb.getOutputsofSB(),
liInputs, _sb.getName(), ec, System.nanoTime()-t0);
if (_sb != null && liInputs != null && !_sb.isNondeterministic() && benefitFromReuse) {
LineageCache.putValue(_sb.getOutputsofSB(), liInputs, _sb.getName(), ec, System.nanoTime() - t0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public void processInstruction(ExecutionContext ec) {
Data exdata = ec.removeVariable(boundVarName);
if( exdata != boundValue && !retVars.hasReferences(exdata) )
ec.cleanupDataObject(exdata);
//FIXME: interferes with reuse. Removes broadcasts before materialization

//add/replace data in symbol table
ec.setVariable(boundVarName, boundValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.apache.sysds.runtime.instructions.spark;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.hops.OptimizerUtils;
Expand Down Expand Up @@ -97,7 +97,6 @@ public void processInstruction(ExecutionContext ec) {
//add a dummy entry to the input, which will be immediately overwritten by the null output.
sec.setVariable( input1.getName(), new BooleanObject(false));
sec.setVariable( output.getName(), new BooleanObject(false));
replaceLineage(ec);
return;
}

Expand All @@ -106,7 +105,6 @@ public void processInstruction(ExecutionContext ec) {
// Do nothing if the RDD is already checkpointed
sec.setVariable(output.getName(), sec.getCacheableData(input1.getName()));
Statistics.decrementNoOfExecutedSPInst();
replaceLineage(ec);
return;
}
//-------
Expand All @@ -121,7 +119,6 @@ public void processInstruction(ExecutionContext ec) {
//available in memory
sec.setVariable(output.getName(), obj);
Statistics.decrementNoOfExecutedSPInst();
replaceLineage(ec);
return;
}

Expand Down Expand Up @@ -188,7 +185,6 @@ else if( input1.getDataType() == DataType.FRAME)
}
else {
out = in; //pass-through
replaceLineage(ec);
}

// Step 3: In-place update of input matrix/frame rdd handle and set as output
Expand All @@ -209,18 +205,13 @@ else if( input1.getDataType() == DataType.FRAME)
cd.setRDDHandle(outro);
}
sec.setVariable( output.getName(), cd);
//TODO: remove lineage tracing of chkpoint to allow
// reuse across loops and basic blocks
//replaceLineage(ec);
}

private void replaceLineage(ExecutionContext ec) {
@Override
public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
// Copy the lineage trace of the input to the output
// to prevent unnecessary chkpoint lineage entry, which wrongly
// reduces reuse opportunities for nested loop bodies.
if (DMLScript.LINEAGE) {
LineageItem inputLi = ec.getLineageItem(input1.getName());
ec.getLineage().set(output.getName(), inputLi);
}
return Pair.of(output.getName(), ec.getLineageItem(input1.getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import java.util.HashMap;
import java.util.Iterator;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.CorrectionLocationType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
Expand Down Expand Up @@ -61,6 +63,8 @@
import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils;
import org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.lineage.LineageItemUtils;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixCell;
Expand Down Expand Up @@ -559,6 +563,73 @@ else if(opcode.equalsIgnoreCase("transformdecode")) {
throw new DMLRuntimeException("Unknown parameterized builtin opcode: " + opcode);
}
}
@Override
public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
String opcode = getOpcode();
if(opcode.equalsIgnoreCase("replace")) {
CPOperand target = getTargetOperand();
CPOperand pattern = getFP64Literal("pattern");
CPOperand replace = getFP64Literal("replacement");
return Pair.of(output.getName(),
new LineageItem(getOpcode(), LineageItemUtils.getLineage(ec, target, pattern, replace)));
}
else if(opcode.equalsIgnoreCase("rmempty")) {
CPOperand target = getTargetOperand();
String off = params.get("offset");
CPOperand offset = new CPOperand(off, ValueType.FP64, Types.DataType.MATRIX);
CPOperand margin = getStringLiteral("margin");
CPOperand emptyReturn = getBoolLiteral("empty.return");
CPOperand maxDim = getLiteral("maxdim", ValueType.FP64);
CPOperand bRmEmptyBC = getBoolLiteral("bRmEmptyBC");
return Pair.of(output.getName(),
new LineageItem(getOpcode(), LineageItemUtils.getLineage(ec, target, offset, margin,
emptyReturn, maxDim, bRmEmptyBC)));
}
else if(opcode.equalsIgnoreCase("transformdecode") || opcode.equalsIgnoreCase("transformapply")) {
CPOperand target = new CPOperand(params.get("target"), ValueType.FP64, Types.DataType.FRAME);
CPOperand meta = getLiteral("meta", ValueType.UNKNOWN, Types.DataType.FRAME);
CPOperand spec = getStringLiteral("spec");
//FIXME: Taking only spec file name as a literal leads to wrong reuse
//TODO: Add Embedding to the lineage item
return Pair.of(output.getName(),
new LineageItem(getOpcode(), LineageItemUtils.getLineage(ec, target, meta, spec)));
}
if(opcode.equalsIgnoreCase("contains")) {
CPOperand target = getTargetOperand();
CPOperand pattern = getFP64Literal("pattern");
return Pair.of(output.getName(),
new LineageItem(getOpcode(), LineageItemUtils.getLineage(ec, target, pattern)));
}
else {
// NOTE: for now, we cannot have a generic fall through path, because the
// data and value types of parmeters are not compiled into the instruction
throw new DMLRuntimeException("Unsupported lineage tracing for: " + opcode);
}
}

private CPOperand getTargetOperand() {
return new CPOperand(params.get("target"), ValueType.FP64, Types.DataType.MATRIX);
}

private CPOperand getFP64Literal(String name) {
return getLiteral(name, ValueType.FP64);
}

private CPOperand getStringLiteral(String name) {
return getLiteral(name, ValueType.STRING);
}

private CPOperand getLiteral(String name, ValueType vt) {
return new CPOperand(params.get(name), vt, Types.DataType.SCALAR, true);
}

private CPOperand getLiteral(String name, ValueType vt, Types.DataType dt) {
return new CPOperand(params.get(name), vt, dt);
}

private CPOperand getBoolLiteral(String name) {
return getLiteral(name, ValueType.BOOLEAN);
}

public HashMap<String, String> getParameterMap() {
return params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public DataCharacteristics getDataCharacteristics() {
*/
public boolean allowsShortCircuitRead()
{
// Cannot trust the hdfs file for reused RDD objects
if (isInLineageCache() && isCheckpointRDD())
return false;

boolean ret = isHDFSFile();

if( isCheckpointRDD() && getLineageChilds().size() == 1 ) {
Expand Down
48 changes: 37 additions & 11 deletions src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.MMTSJ.MMTSJType;
import org.apache.sysds.parser.DataIdentifier;
Expand Down Expand Up @@ -246,13 +247,16 @@ public static boolean reuse(List<String> outNames, List<DataIdentifier> outParam
if (e != null) {
String boundVarName = outNames.get(i);
Data boundValue = null;
//String fname = "target\\testTemp\\functions\\async\\LineageReuseSparkTest\\LineageReuseSpark8/target/scratch_space//_p11736_192.168.0.113//_t0/temp999";
//fname = VariableCPInstruction.getUniqueFileName(fname);
//convert to matrix object
if (e.isMatrixValue()) {
MatrixBlock mb = e.getMBValue();
if (mb == null && e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the executing thread removed this entry from cache
MetaDataFormat md = new MetaDataFormat(
e.getMBValue().getDataCharacteristics(),FileFormat.BINARY);
md.getDataCharacteristics().setBlocksize(ConfigurationManager.getBlocksize());
boundValue = new MatrixObject(ValueType.FP64, boundVarName, md);
((MatrixObject)boundValue).acquireModify(e.getMBValue());
((MatrixObject)boundValue).release();
Expand Down Expand Up @@ -311,6 +315,7 @@ else if (e.isScalarValue()) {
case GPUCACHED:
//Increment the live count for this pointer
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
if (DMLScript.STATISTICS) LineageCacheStatistics.incrementGpuHits();
break;
default:
return false;
Expand All @@ -328,10 +333,14 @@ else if (e.isRDDPersist()) {
//Reuse the persisted intermediate at the executors
//Safely cleanup the child RDDs if this RDD is persisted already
//If reused 3 times and still not persisted, move to Spark asynchronously
if (probeRDDDistributed(e))
if (probeRDDDistributed(e)) {
LineageSparkCacheEviction.cleanupChildRDDs(e);
else
if (DMLScript.STATISTICS) LineageCacheStatistics.incrementRDDPersistHits();
}
else {
LineageSparkCacheEviction.moveToSpark(e);
if (DMLScript.STATISTICS) LineageCacheStatistics.incrementRDDHits();
}
break;
default:
return false;
Expand Down Expand Up @@ -395,6 +404,7 @@ public static FederatedResponse reuse(FederatedUDF udf, ExecutionContext ec)

MetaDataFormat md = new MetaDataFormat(
e.getMBValue().getDataCharacteristics(),FileFormat.BINARY);
md.getDataCharacteristics().setBlocksize(ConfigurationManager.getBlocksize());
outValue = new MatrixObject(ValueType.FP64, outName, md);
((MatrixObject)outValue).acquireModify(e.getMBValue());
((MatrixObject)outValue).release();
Expand Down Expand Up @@ -604,9 +614,11 @@ public static void putValue(Instruction inst, ExecutionContext ec, long starttim
//if (!isMarkedForCaching(inst, ec)) return;
List<Pair<LineageItem, Data>> liData = null;
GPUObject liGPUObj= null;
LineageItem instLI = ((LineageTraceable) inst).getLineageItem(ec).getValue();
//LineageItem instLI = ((LineageTraceable) inst).getLineageItem(ec).getValue();
LineageItem instLI = null;
if (inst instanceof MultiReturnBuiltinCPInstruction) {
liData = new ArrayList<>();
instLI = ((LineageTraceable) inst).getLineageItem(ec).getValue();
MultiReturnBuiltinCPInstruction mrInst = (MultiReturnBuiltinCPInstruction)inst;
for (int i=0; i<mrInst.getNumOutputs(); i++) {
String opcode = instLI.getOpcode() + String.valueOf(i);
Expand All @@ -630,16 +642,23 @@ else if (inst instanceof GPUInstruction) {
else if (inst instanceof ComputationSPInstruction
&& (ec.getVariable(((ComputationSPInstruction) inst).output) instanceof MatrixObject)
&& (ec.getCacheableData(((ComputationSPInstruction)inst).output.getName())).hasRDDHandle()) {
instLI = ec.getLineageItem(((ComputationSPInstruction) inst).output);
putValueRDD(inst, instLI, ec, computetime);
return;
}
else
if (inst instanceof ComputationCPInstruction)
if (inst instanceof ComputationCPInstruction) {
instLI = ec.getLineageItem(((ComputationCPInstruction) inst).output);
liData = Arrays.asList(Pair.of(instLI, ec.getVariable(((ComputationCPInstruction) inst).output)));
else if (inst instanceof ComputationFEDInstruction)
}
else if (inst instanceof ComputationFEDInstruction) {
instLI = ec.getLineageItem(((ComputationFEDInstruction) inst).output);
liData = Arrays.asList(Pair.of(instLI, ec.getVariable(((ComputationFEDInstruction) inst).output)));
else if (inst instanceof ComputationSPInstruction) //collects or prefetches
}
else if (inst instanceof ComputationSPInstruction) { //collects or prefetches
instLI = ec.getLineageItem(((ComputationSPInstruction) inst).output);
liData = Arrays.asList(Pair.of(instLI, ec.getVariable(((ComputationSPInstruction) inst).output)));
}

if (liGPUObj == null)
putValueCPU(inst, liData, computetime);
Expand Down Expand Up @@ -857,8 +876,7 @@ public static void putValue(List<DataIdentifier> outputs,
LineageItem li = new LineageItem(opcode, liInputs);
String boundVarName = outputs.get(i).getName();
LineageItem boundLI = ec.getLineage().get(boundVarName);
if (boundLI != null)
boundLI.resetVisitStatusNR();
if (boundLI != null) boundLI.resetVisitStatusNR();
if (boundLI == null || !LineageCache.probe(li) || !LineageCache.probe(boundLI)) {
AllOutputsCacheable = false;
//FIXME: if boundLI is for a MultiReturnBuiltin instruction
Expand Down Expand Up @@ -1052,8 +1070,12 @@ private static void putIntern(LineageItem key, DataType dt, MatrixBlock Mval, Sc

private static LineageCacheEntry getIntern(LineageItem key) {
LineageCacheEntry e = _cache.get(key);
if (e == null)
if (e == null) {
if(DMLScript.STATISTICS && LineageCacheEviction._removelist.containsKey(key))
// The sought entry was in cache but removed later
LineageCacheStatistics.incrementDelHits();
return null;
}

if (e.getCacheStatus() != LineageCacheStatus.SPILLED) {
if (DMLScript.STATISTICS)
Expand Down Expand Up @@ -1351,10 +1373,14 @@ private static List<MutablePair<LineageItem, LineageCacheEntry>> getLineageItems

List<MutablePair<LineageItem, LineageCacheEntry>> liList = null;
//FIXME: Replace getLineageItem with get/getOrCreate to avoid creating a new LI object
LineageItem instLI = (cinst != null) ? cinst.getLineageItem(ec).getValue()
LineageItem instLI = (cinst != null) ? ec.getLineageItem(cinst.output)
: (cfinst != null) ? ec.getLineageItem(cfinst.output)
: (cspinst != null) ? ec.getLineageItem(cspinst.output)
: ec.getLineageItem(gpuinst._output);
/*LineageItem instLI = (cinst != null) ? cinst.getLineageItem(ec).getValue()
: (cfinst != null) ? cfinst.getLineageItem(ec).getValue()
: (cspinst != null) ? cspinst.getLineageItem(ec).getValue()
: gpuinst.getLineageItem(ec).getValue();
: gpuinst.getLineageItem(ec).getValue();*/
if (inst instanceof MultiReturnBuiltinCPInstruction) {
liList = new ArrayList<>();
MultiReturnBuiltinCPInstruction mrInst = (MultiReturnBuiltinCPInstruction)inst;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,28 @@ public class LineageCacheConfig

private static final String[] OPCODES = new String[] {
"tsmm", "ba+*", "*", "/", "+", "||", "nrow", "ncol", "round", "exp", "log",
"rightIndex", "leftIndex", "groupedagg", "r'", "solve", "spoof",
"rightIndex", "leftIndex", "groupedagg", "r'", "solve", "spoof", "isna",
"uamean", "max", "min", "ifelse", "-", "sqrt", "<", ">", "uak+", "<=",
"^", "uamax", "uark+", "uacmean", "eigen", "ctableexpand", "replace",
"^", "uamax", "uark+", "uacmean", "eigen","ctable", "ctableexpand", "replace",
"^2", "*2", "uack+", "tak+*", "uacsqk+", "uark+", "n+", "uarimax", "qsort",
"qpick", "transformapply", "uarmax", "n+", "-*", "castdtm", "lowertri",
"prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*", "=="
"prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*", "==", "rmempty"
//TODO: Reuse everything.
};

// Relatively expensive instructions. Most include shuffles.
private static final String[] PERSIST_OPCODES1 = new String[] {
"cpmm", "rmm", "pmm", "rev", "rshape", "rsort", "+", "-", "*",
"cpmm", "rmm", "pmm", "rev", "rshape", "rsort", "-", "*", "+",
"/", "%%", "%/%", "1-*", "^", "^2", "*2", "==", "!=", "<", ">",
"<=", ">=", "&&", "||", "xor", "max", "min", "rmempty", "rappend",
"gappend", "galignedappend", "rbind", "cbind", "nmin", "nmax",
"n+", "ctable", "ucumack+", "ucumac*", "ucumacmin", "ucumacmax",
"qsort", "qpick"
"qsort", "qpick", "replace"
};

// Relatively inexpensive instructions.
private static final String[] PERSIST_OPCODES2 = new String[] {
"mapmm"
"mapmm", "isna", "leftIndex", "rightIndex"
};

private static String[] REUSE_OPCODES = new String[] {};
Expand Down
Loading

0 comments on commit 5903419

Please sign in to comment.