Skip to content

Commit

Permalink
Merge pull request #336 from rxin/master
Browse files Browse the repository at this point in the history
Fixed a bug that some columns become null for in-memory tables when lateral view explode is used.
  • Loading branch information
rxin committed Jun 1, 2014
2 parents c177c67 + c119ffd commit f8c16f2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
7 changes: 4 additions & 3 deletions src/main/scala/shark/execution/LateralViewJoinOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,17 @@ class LateralViewJoinOperator extends NaryOperator[LateralViewJoinDesc] {
val lvjSelFields = lvjSelSoi.getAllStructFieldRefs()

iter.flatMap { row =>
var arrToExplode = udtfEval.map(x => x.evaluate(row))
val arrToExplode = udtfEval.map(x => x.evaluate(row))
val explodedRows = udtfOp.explode(arrToExplode)

explodedRows.map { expRow =>
val expRowArray = expRow.asInstanceOf[Array[java.lang.Object]]
val joinedRow = new Array[java.lang.Object](lvjSelFields.size + expRowArray.length)

// Add row fields from LateralViewForward
val lvjSelFieldsLen = lvjSelFields.size
var i = 0
while (i < lvjSelFields.size) {
while (i < lvjSelFieldsLen) {
if (lvjSelEval != null) {
joinedRow(i) = lvjSelEval(i).evaluate(row)
} else {
Expand All @@ -157,7 +158,7 @@ class LateralViewJoinOperator extends NaryOperator[LateralViewJoinDesc] {
// Append element(s) from explode
i = 0
while (i < expRowArray.length) {
joinedRow(i + lvjSelFields.size) = expRowArray(i)
joinedRow(i + lvjSelFieldsLen) = expRowArray(i)
i += 1
}
joinedRow
Expand Down
49 changes: 29 additions & 20 deletions src/main/scala/shark/execution/optimization/ColumnPruner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,12 @@ class ColumnPruner(@transient op: TopOperator[_], @transient tbl: Table) extends
cols: HashSet[String],
parentOp: Operator[_] = null) {

def nullGuard[T](s: JList[T]): Seq[T] = {
if (s == null) Seq[T]() else s
}

op match {
case selOp: SelectOperator =>
val cnf: SelectDesc = selOp.getConf
//Select Descriptor contains SelectConf, which holds the list of columns
//referenced by the select op
if (cnf != null) {
if (cnf.isSelStarNoCompute) {
// For star, return immediately since there is no point doing any further pruning.
cols.clear()
cols += "*"
return cols
} else {
val evals = nullGuard(cnf.getColList)
cols ++= (HashSet() ++ evals).flatMap(x => nullGuard(x.getCols))
}
}
cols ++= getColumnsFromSelectOp(selOp)

case filterOp: FilterOperator =>
val cnf:FilterDesc = filterOp.getConf
val cnf: FilterDesc = filterOp.getConf
//FilterDesc has predicates, which are the columns involved in predicate operations
if (cnf != null) {
cols ++= (HashSet() ++ nullGuard(cnf.getPredicate.getCols))
Expand Down Expand Up @@ -120,6 +103,12 @@ class ColumnPruner(@transient op: TopOperator[_], @transient tbl: Table) extends
cols ++= (HashSet() ++ keys).flatMap(x => nullGuard(x.getCols))
}

case lvj: LateralViewJoinOperator =>
lvj.parentOperators.head match {
case selOp: SelectOperator => cols ++= getColumnsFromSelectOp(selOp)
case _ => // Do nothing
}

case _ =>
}

Expand All @@ -136,11 +125,31 @@ class ColumnPruner(@transient op: TopOperator[_], @transient tbl: Table) extends
// Note that the actual Select Op in that branch only contains the Array evaluators, so we
// can't column prune based on it.
cols += "*"
return cols
} else {
computeColumnsToKeep(childOp, cols, op)
}
currentChildIndex = currentChildIndex + 1
}
}

private def nullGuard[T](s: JList[T]): Seq[T] = {
if (s == null) Seq[T]() else s
}

private def getColumnsFromSelectOp(selOp: SelectOperator): Set[String] = {
val cnf: SelectDesc = selOp.getConf
// Select Descriptor contains SelectConf, which holds the list of columns
// referenced by the select op
if (cnf != null) {
if (cnf.isSelStarNoCompute) {
// For star, return immediately since there is no point doing any further pruning.
Set("*")
} else {
val evals = nullGuard(cnf.getColList)
Set() ++ evals.flatMap(x => nullGuard(x.getCols))
}
} else {
Set.empty
}
}
}

0 comments on commit f8c16f2

Please sign in to comment.