Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patch HIVE-13602 and fix process position alias error when CBO is enabled and failed #66

Open
wants to merge 3 commits into
base: cosmos-hive-1.2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
Expand Down Expand Up @@ -175,4 +176,14 @@ public static StructObjectInspector getVCSObjectInspector(List<VirtualColumn> vc
}
return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
}

public static boolean isVirtualColumnBasedOnAlias(ColumnInfo column) {
// Not using method column.getIsVirtualCol() because partitioning columns
// are also treated as virtual columns in ColumnInfo.
if (column.getAlias() != null
&& VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(column.getAlias().toUpperCase())) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;

/**
Expand All @@ -59,37 +65,6 @@ public Map<Operator<? extends Serializable>, Map<ColumnInfo, ExprNodeDesc>> getO
return opToConstantExprs;
}

/**
* Resolve a ColumnInfo based on given RowResolver.
*
* @param ci
* @param rr
* @param parentRR
* @return
* @throws SemanticException
*/
private ColumnInfo resolve(ColumnInfo ci, RowSchema rs, RowSchema parentRS) {
// Resolve new ColumnInfo from <tableAlias, alias>
String alias = ci.getAlias();
if (alias == null) {
alias = ci.getInternalName();
}
String tblAlias = ci.getTabAlias();
ColumnInfo rci = rs.getColumnInfo(tblAlias, alias);
if (rci == null && rs.getTableNames().size() == 1 &&
parentRS.getTableNames().size() == 1) {
rci = rs.getColumnInfo(rs.getTableNames().iterator().next(),
alias);
}
if (rci == null) {
return null;
}
LOG.debug("Resolved "
+ ci.getTabAlias() + "." + ci.getAlias() + " as "
+ rci.getTabAlias() + "." + rci.getAlias() + " with rs: " + rs);
return rci;
}

/**
* Get propagated constant map from parents.
*
Expand All @@ -115,58 +90,130 @@ public Map<ColumnInfo, ExprNodeDesc> getPropagatedConstants(
return constants;
}

if (op instanceof UnionOperator) {
String alias = rs.getSignature().get(0).getTabAlias();
// find intersection
Map<ColumnInfo, ExprNodeDesc> intersection = null;
for (Operator<?> parent : op.getParentOperators()) {
Map<ColumnInfo, ExprNodeDesc> unionConst = opToConstantExprs.get(parent);
LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst);
if (intersection == null) {
intersection = new HashMap<ColumnInfo, ExprNodeDesc>();
for (Entry<ColumnInfo, ExprNodeDesc> e : unionConst.entrySet()) {
ColumnInfo ci = new ColumnInfo(e.getKey());
ci.setTabAlias(alias);
intersection.put(ci, e.getValue());
// A previous solution is based on tableAlias and colAlias, which is
// unsafe, esp. when CBO generates derived table names. see HIVE-13602.
// For correctness purpose, we only trust colExpMap.
// We assume that CBO can do the constantPropagation before this function is
// called to help improve the performance.
// UnionOperator, LimitOperator and FilterOperator are special, they should already be
// column-position aligned.

List<Map<Integer, ExprNodeDesc>> parentsToConstant = new ArrayList<>();
boolean areAllParentsContainConstant = true;
boolean noParentsContainConstant = true;
for (Operator<?> parent : op.getParentOperators()) {
Map<ColumnInfo, ExprNodeDesc> constMap = opToConstantExprs.get(parent);
if (constMap == null) {
LOG.debug("Constant of Op " + parent.getOperatorId() + " is not found");
areAllParentsContainConstant = false;
} else {
noParentsContainConstant = false;
Map<Integer, ExprNodeDesc> map = new HashMap<>();
for (Entry<ColumnInfo, ExprNodeDesc> entry : constMap.entrySet()) {
map.put(parent.getSchema().getPosition(entry.getKey().getInternalName()),
entry.getValue());
}
parentsToConstant.add(map);
LOG.debug("Constant of Op " + parent.getOperatorId() + " " + constMap);
}
}
if (noParentsContainConstant) {
return constants;
}

ArrayList<ColumnInfo> signature = op.getSchema().getSignature();
if (op instanceof LimitOperator || op instanceof FilterOperator) {
// there should be only one parent.
if (op.getParentOperators().size() == 1) {
Map<Integer, ExprNodeDesc> parentToConstant = parentsToConstant.get(0);
for (int index = 0; index < signature.size(); index++) {
if (parentToConstant.containsKey(index)) {
constants.put(signature.get(index), parentToConstant.get(index));
}
} else {
Iterator<Entry<ColumnInfo, ExprNodeDesc>> itr = intersection.entrySet().iterator();
while (itr.hasNext()) {
Entry<ColumnInfo, ExprNodeDesc> e = itr.next();
boolean found = false;
for (Entry<ColumnInfo, ExprNodeDesc> f : opToConstantExprs.get(parent).entrySet()) {
if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) {
if (e.getValue().isSame(f.getValue())) {
found = true;
}
}
}
} else if (op instanceof UnionOperator && areAllParentsContainConstant) {
for (int index = 0; index < signature.size(); index++) {
ExprNodeDesc constant = null;
for (Map<Integer, ExprNodeDesc> parentToConstant : parentsToConstant) {
if (!parentToConstant.containsKey(index)) {
// if this parent does not contain a constant at this position, we
// continue to look at other positions.
constant = null;
break;
} else {
if (constant == null) {
constant = parentToConstant.get(index);
} else {
// compare if they are the same constant.
ExprNodeDesc nextConstant = parentToConstant.get(index);
if (!nextConstant.isSame(constant)) {
// they are not the same constant. for example, union all of 1
// and 2.
constant = null;
break;
}
}
if (!found) {
itr.remove();
}
}
}
if (intersection.isEmpty()) {
return intersection;
// we have checked all the parents for the "index" position.
if (constant != null) {
constants.put(signature.get(index), constant);
}
}
LOG.debug("Propagated union constants:" + intersection);
return intersection;
}

for (Operator<? extends Serializable> parent : op.getParentOperators()) {
Map<ColumnInfo, ExprNodeDesc> c = opToConstantExprs.get(parent);
for (Entry<ColumnInfo, ExprNodeDesc> e : c.entrySet()) {
ColumnInfo ci = e.getKey();
ColumnInfo rci = null;
ExprNodeDesc constant = e.getValue();
rci = resolve(ci, rs, parent.getSchema());
if (rci != null) {
constants.put(rci, constant);
} else {
LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() +
"(" + ci.getInternalName() + ") from rs:" + rs);
} else if (op instanceof JoinOperator) {
JoinOperator joinOp = (JoinOperator) op;
Iterator<Entry<Byte, List<ExprNodeDesc>>> itr = joinOp.getConf().getExprs().entrySet()
.iterator();
while (itr.hasNext()) {
Entry<Byte, List<ExprNodeDesc>> e = itr.next();
int tag = e.getKey();
Operator<?> parent = op.getParentOperators().get(tag);
List<ExprNodeDesc> exprs = e.getValue();
if (exprs == null) {
continue;
}
for (ExprNodeDesc expr : exprs) {
// we are only interested in ExprNodeColumnDesc
if (expr instanceof ExprNodeColumnDesc) {
String parentColName = ((ExprNodeColumnDesc) expr).getColumn();
// find this parentColName in its parent's rs
int parentPos = parent.getSchema().getPosition(parentColName);
if (parentsToConstant.get(tag).containsKey(parentPos)) {
// this position in parent is a constant
// reverse look up colExprMap to find the childColName
if (op.getColumnExprMap() != null && op.getColumnExprMap().entrySet() != null) {
for (Entry<String, ExprNodeDesc> entry : op.getColumnExprMap().entrySet()) {
if (entry.getValue().isSame(expr)) {
// now propagate the constant from the parent to the child
constants.put(signature.get(op.getSchema().getPosition(entry.getKey())),
parentsToConstant.get(tag).get(parentPos));
}
}
}
}
}
}
}
} else {
// there should be only one parent.
if (op.getParentOperators().size() == 1) {
Operator<?> parent = op.getParentOperators().get(0);
if (op.getColumnExprMap() != null && op.getColumnExprMap().entrySet() != null) {
for (Entry<String, ExprNodeDesc> entry : op.getColumnExprMap().entrySet()) {
ExprNodeDesc expr = entry.getValue();
if (expr instanceof ExprNodeColumnDesc) {
String parentColName = ((ExprNodeColumnDesc) expr).getColumn();
// find this parentColName in its parent's rs
int parentPos = parent.getSchema().getPosition(parentColName);
if (parentsToConstant.get(0).containsKey(parentPos)) {
// this position in parent is a constant
// now propagate the constant from the parent to the child
constants.put(signature.get(op.getSchema().getPosition(entry.getKey())),
parentsToConstant.get(0).get(parentPos));
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
Expand Down Expand Up @@ -782,6 +783,18 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object..
Map<ColumnInfo, ExprNodeDesc> colToConstants = cppCtx.getPropagatedConstants(op);
cppCtx.getOpToConstantExprs().put(op, colToConstants);

RowSchema rs = op.getSchema();
if (op.getColumnExprMap() != null && rs != null) {
for (ColumnInfo colInfo : rs.getSignature()) {
if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) {
ExprNodeDesc expr = op.getColumnExprMap().get(colInfo.getInternalName());
if (expr instanceof ExprNodeConstantDesc) {
colToConstants.put(colInfo, expr);
}
}
}
}

if (colToConstants.isEmpty()) {
return null;
}
Expand Down Expand Up @@ -819,6 +832,17 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object..
Operator<? extends Serializable> op = (Operator<? extends Serializable>) nd;
Map<ColumnInfo, ExprNodeDesc> constants = cppCtx.getPropagatedConstants(op);
cppCtx.getOpToConstantExprs().put(op, constants);
RowSchema rs = op.getSchema();
if (op.getColumnExprMap() != null && rs != null) {
for (ColumnInfo colInfo : rs.getSignature()) {
if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) {
ExprNodeDesc expr = op.getColumnExprMap().get(colInfo.getInternalName());
if (expr instanceof ExprNodeConstantDesc) {
constants.put(colInfo, expr);
}
}
}
}
if (constants.isEmpty()) {
return null;
}
Expand Down Expand Up @@ -870,6 +894,12 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object..
}
}
colList.set(i, newCol);
if (newCol instanceof ExprNodeConstantDesc && op.getSchema() != null) {
ColumnInfo colInfo = op.getSchema().getSignature().get(i);
if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) {
constants.put(colInfo, newCol);
}
}
if (columnExprMap != null) {
columnExprMap.put(columnNames.get(i), newCol);
}
Expand Down Expand Up @@ -976,6 +1006,17 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object..
Map<ColumnInfo, ExprNodeDesc> constants = cppCtx.getPropagatedConstants(op);

cppCtx.getOpToConstantExprs().put(op, constants);
RowSchema rs = op.getSchema();
if (op.getColumnExprMap() != null && rs != null) {
for (ColumnInfo colInfo : rs.getSignature()) {
if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) {
ExprNodeDesc expr = op.getColumnExprMap().get(colInfo.getInternalName());
if (expr instanceof ExprNodeConstantDesc) {
constants.put(colInfo, expr);
}
}
}
}
if (constants.isEmpty()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10044,7 +10044,7 @@ boolean genResolvedParseTree(ASTNode ast, PlannerContext plannerCtx) throws Sema
ctesExpanded = new ArrayList<String>();

// 1. analyze and process the position alias
processPositionAlias(ast);
// step out position alias process

// 2. analyze create table command
if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) {
Expand Down Expand Up @@ -10130,6 +10130,10 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept
void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticException {
// 1. Generate Resolved Parse tree from syntax tree
LOG.info("Starting Semantic Analysis");

//change the location of position alias process here
processPositionAlias(ast);

if (!genResolvedParseTree(ast, plannerCtx)) {
return;
}
Expand Down