From 952dc09bd45051a51bbecaab9333d379a95b6a03 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 31 May 2014 18:11:32 -0700 Subject: [PATCH 1/2] Fixed a bug that some columns become null for in-memory tables when lateral view explode is used. --- .../execution/LateralViewJoinOperator.scala | 8 +-- .../execution/optimization/ColumnPruner.scala | 49 ++++++++++++------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/src/main/scala/shark/execution/LateralViewJoinOperator.scala b/src/main/scala/shark/execution/LateralViewJoinOperator.scala index c30005d4..f37b3841 100755 --- a/src/main/scala/shark/execution/LateralViewJoinOperator.scala +++ b/src/main/scala/shark/execution/LateralViewJoinOperator.scala @@ -137,7 +137,7 @@ class LateralViewJoinOperator extends NaryOperator[LateralViewJoinDesc] { val lvjSelFields = lvjSelSoi.getAllStructFieldRefs() iter.flatMap { row => - var arrToExplode = udtfEval.map(x => x.evaluate(row)) + val arrToExplode = udtfEval.map(x => x.evaluate(row)) val explodedRows = udtfOp.explode(arrToExplode) explodedRows.map { expRow => @@ -145,8 +145,9 @@ class LateralViewJoinOperator extends NaryOperator[LateralViewJoinDesc] { val joinedRow = new Array[java.lang.Object](lvjSelFields.size + expRowArray.length) // Add row fields from LateralViewForward + val lvjSelFieldsLen = lvjSelFields.size var i = 0 - while (i < lvjSelFields.size) { + while (i < lvjSelFieldsLen) { if (lvjSelEval != null) { joinedRow(i) = lvjSelEval(i).evaluate(row) } else { @@ -157,9 +158,10 @@ class LateralViewJoinOperator extends NaryOperator[LateralViewJoinDesc] { // Append element(s) from explode i = 0 while (i < expRowArray.length) { - joinedRow(i + lvjSelFields.size) = expRowArray(i) + joinedRow(i + lvjSelFieldsLen) = expRowArray(i) i += 1 } + println(joinedRow.toSeq) joinedRow } } diff --git a/src/main/scala/shark/execution/optimization/ColumnPruner.scala b/src/main/scala/shark/execution/optimization/ColumnPruner.scala index ab928543..549f7a62 100644 --- a/src/main/scala/shark/execution/optimization/ColumnPruner.scala +++ b/src/main/scala/shark/execution/optimization/ColumnPruner.scala @@ -65,29 +65,14 @@ class ColumnPruner(@transient op: TopOperator[_], @transient tbl: Table) extends cols: HashSet[String], parentOp: Operator[_] = null) { - def nullGuard[T](s: JList[T]): Seq[T] = { - if (s == null) Seq[T]() else s - } + println("operator to check is " + op) op match { case selOp: SelectOperator => - val cnf: SelectDesc = selOp.getConf - //Select Descriptor contains SelectConf, which holds the list of columns - //referenced by the select op - if (cnf != null) { - if (cnf.isSelStarNoCompute) { - // For star, return immediately since there is no point doing any further pruning. - cols.clear() - cols += "*" - return cols - } else { - val evals = nullGuard(cnf.getColList) - cols ++= (HashSet() ++ evals).flatMap(x => nullGuard(x.getCols)) - } - } + cols ++= getColumnsFromSelectOp(selOp) case filterOp: FilterOperator => - val cnf:FilterDesc = filterOp.getConf + val cnf: FilterDesc = filterOp.getConf //FilterDesc has predicates, which are the columns involved in predicate operations if (cnf != null) { cols ++= (HashSet() ++ nullGuard(cnf.getPredicate.getCols)) @@ -120,6 +105,12 @@ class ColumnPruner(@transient op: TopOperator[_], @transient tbl: Table) extends cols ++= (HashSet() ++ keys).flatMap(x => nullGuard(x.getCols)) } + case lvj: LateralViewJoinOperator => + lvj.parentOperators.head match { + case selOp: SelectOperator => cols ++= getColumnsFromSelectOp(selOp) + case _ => // Do nothing + } + case _ => } @@ -136,11 +127,31 @@ class ColumnPruner(@transient op: TopOperator[_], @transient tbl: Table) extends // Note that the actual Select Op in that branch only contains the Array evaluators, so we // can't column prune based on it. cols += "*" - return cols } else { computeColumnsToKeep(childOp, cols, op) } currentChildIndex = currentChildIndex + 1 } } + + private def nullGuard[T](s: JList[T]): Seq[T] = { + if (s == null) Seq[T]() else s + } + + private def getColumnsFromSelectOp(selOp: SelectOperator): Set[String] = { + val cnf: SelectDesc = selOp.getConf + // Select Descriptor contains SelectConf, which holds the list of columns + // referenced by the select op + if (cnf != null) { + if (cnf.isSelStarNoCompute) { + // For star, return immediately since there is no point doing any further pruning. + Set("*") + } else { + val evals = nullGuard(cnf.getColList) + Set() ++ evals.flatMap(x => nullGuard(x.getCols)) + } + } else { + Set.empty + } + } } From c119ffd9446777cc5618abf703e9e0deb0c25f11 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 31 May 2014 18:14:30 -0700 Subject: [PATCH 2/2] Remove the extra println statements. --- src/main/scala/shark/execution/LateralViewJoinOperator.scala | 1 - src/main/scala/shark/execution/optimization/ColumnPruner.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/src/main/scala/shark/execution/LateralViewJoinOperator.scala b/src/main/scala/shark/execution/LateralViewJoinOperator.scala index f37b3841..b33dc852 100755 --- a/src/main/scala/shark/execution/LateralViewJoinOperator.scala +++ b/src/main/scala/shark/execution/LateralViewJoinOperator.scala @@ -161,7 +161,6 @@ class LateralViewJoinOperator extends NaryOperator[LateralViewJoinDesc] { joinedRow(i + lvjSelFieldsLen) = expRowArray(i) i += 1 } - println(joinedRow.toSeq) joinedRow } } diff --git a/src/main/scala/shark/execution/optimization/ColumnPruner.scala b/src/main/scala/shark/execution/optimization/ColumnPruner.scala index 549f7a62..8c2437aa 100644 --- a/src/main/scala/shark/execution/optimization/ColumnPruner.scala +++ b/src/main/scala/shark/execution/optimization/ColumnPruner.scala @@ -65,8 +65,6 @@ class ColumnPruner(@transient op: TopOperator[_], @transient tbl: Table) extends cols: HashSet[String], parentOp: Operator[_] = null) { - println("operator to check is " + op) - op match { case selOp: SelectOperator => cols ++= getColumnsFromSelectOp(selOp)