Review Board 1.7.22


HIVE-5271: Convert join op to a map join op in the planning phase

Review Request #14349 - Created Sept. 26, 2013 and updated

Vikram Dixit Kumaraswamy
tez
Reviewers
hive
haglein
hive-git
 Convert join op to a map join op in the planning phase.

 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
New File

    
   
1
/**

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing, software

    
   
13
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
15
 * See the License for the specific language governing permissions and

    
   
16
 * limitations under the License.

    
   
17
 */

    
   
18

   

    
   
19
package org.apache.hadoop.hive.ql.optimizer;

    
   
20

   

    
   
21
import java.util.Set;

    
   
22
import java.util.Stack;

    
   
23

   

    
   
24
import org.apache.commons.logging.Log;

    
   
25
import org.apache.commons.logging.LogFactory;

    
   
26
import org.apache.hadoop.hive.conf.HiveConf;

    
   
27
import org.apache.hadoop.hive.ql.exec.JoinOperator;

    
   
28
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;

    
   
29
import org.apache.hadoop.hive.ql.exec.Operator;

    
   
30
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;

    
   
31
import org.apache.hadoop.hive.ql.lib.Node;

    
   
32
import org.apache.hadoop.hive.ql.lib.NodeProcessor;

    
   
33
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;

    
   
34
import org.apache.hadoop.hive.ql.metadata.HiveException;

    
   
35
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;

    
   
36
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;

    
   
37
import org.apache.hadoop.hive.ql.parse.ParseContext;

    
   
38
import org.apache.hadoop.hive.ql.parse.SemanticException;

    
   
39
import org.apache.hadoop.hive.ql.plan.OperatorDesc;

    
   
40
import org.apache.hadoop.hive.ql.plan.Statistics;

    
   
41

   

    
   
42

   

    
   
43
public class ConvertJoinMapJoin implements NodeProcessor {

    
   
44

   

    
   
45
  static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());

    
   
46

   

    
   
47
  @Override

    
   
48
  /*

    
   
49
   * (non-Javadoc)

    
   
50
   * we should ideally not modify the tree we traverse.

    
   
51
   * However, since we need to walk the tree at any time when we modify the

    
   
52
   * operator, we might as well do it here.

    
   
53
   */

    
   
54
  public Object process(Node nd, Stack<Node> stack,

    
   
55
      NodeProcessorCtx procCtx, Object... nodeOutputs)

    
   
56
      throws SemanticException {

    
   
57
    OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;

    
   
58
    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {

    
   
59
      return null;

    
   
60
    }

    
   
61
    JoinOperator joinOp = (JoinOperator) nd;

    
   
62

   

    
   
63
    Set<Integer> bigTableCandidateSet = MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());

    
   
64

   

    
   
65
    long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);

    
   
66
    int bigTablePosition = -1;

    
   
67

   

    
   
68
    Statistics bigInputStat = null;

    
   
69
    long totalSize = 0;

    
   
70
    int pos = 0;

    
   
71
    boolean bigTableFound = false;

    
   
72
    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {

    
   
73
      Statistics currInputStat = null;

    
   
74
      try {

    
   
75
        currInputStat = parentOp.getStatistics(context.conf);

    
   
76
      } catch (HiveException e) {

    
   
77
        return null;

    
   
78
      }

    
   
79

   

    
   
80
      long inputSize = currInputStat.getNumberOfBytes();

    
   
81
      if ((bigInputStat == null) ||

    
   
82
          ((bigInputStat != null) && (inputSize > bigInputStat.getNumberOfBytes()))) {

    
   
83
        if (bigTableFound == true) {

    
   
84
          // cannot convert to map join

    
   
85
          return null;

    
   
86
        }

    
   
87

   

    
   
88
        if (inputSize > maxSize) {

    
   
89
          if (!bigTableCandidateSet.contains(pos)) {

    
   
90
            // multiple big table candidates => cannot convert.

    
   
91
            return null;

    
   
92
          }

    
   
93

   

    
   
94
          bigTableFound = true;

    
   
95
        }

    
   
96

   

    
   
97
        if (bigInputStat != null) {

    
   
98
          totalSize += bigInputStat.getNumberOfBytes();

    
   
99
        }

    
   
100
        if (totalSize > maxSize) {

    
   
101
          // sum of small tables size in this join exceeds configured limit

    
   
102
          // hence cannot convert.

    
   
103
          return null;

    
   
104
        }

    
   
105

   

    
   
106
        if (bigTableCandidateSet.contains(pos)) {

    
   
107
          bigTablePosition = pos;

    
   
108
          bigInputStat = currInputStat;

    
   
109
        }

    
   
110
        pos++;

    
   
111
      } else {

    
   
112
        totalSize += currInputStat.getNumberOfBytes();

    
   
113
        if (totalSize > maxSize) {

    
   
114
          return null;

    
   
115
        }

    
   
116
      }

    
   
117
    }

    
   
118

   

    
   
119
    /*

    
   
120
     * Once we have decided on the map join, the tree would transform from

    
   
121
     *

    
   
122
     *        |                   |

    
   
123
     *       Join               MapJoin

    
   
124
     *       / \                /   \

    
   
125
     *      RS RS   --->      RS    TS (big table)

    
   
126
     *      /   \            /

    
   
127
     *    TS     TS         TS (small table)

    
   
128
     *

    
   
129
     * for tez.

    
   
130
     */

    
   
131

   

    
   
132
    // convert to a map join operator with this information

    
   
133
    ParseContext parseContext = context.parseContext;

    
   
134
    MapJoinOperator mapJoinOp = MapJoinProcessor.convertJoinOpMapJoinOp(parseContext.getOpParseCtx(),

    
   
135
        joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true, false);

    
   
136

   

    
   
137
    Operator<? extends OperatorDesc> parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition);

    
   
138
    if (parentBigTableOp instanceof ReduceSinkOperator) {

    
   
139
      mapJoinOp.getParentOperators().remove(bigTablePosition);

    
   
140
      if (!(mapJoinOp.getParentOperators().contains(

    
   
141
          parentBigTableOp.getParentOperators().get(0)))) {

    
   
142
        mapJoinOp.getParentOperators().add(bigTablePosition, parentBigTableOp.getParentOperators().get(0));

    
   
143
      }

    
   
144
      parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);

    
   
145
      for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {

    
   
146
        if (!(op.getChildOperators().contains(mapJoinOp))) {

    
   
147
          op.getChildOperators().add(mapJoinOp);

    
   
148
        }

    
   
149
        op.getChildOperators().remove(joinOp);

    
   
150
      }

    
   
151
    }

    
   
152

   

    
   
153
    return null;

    
   
154
  }

    
   
155
}
ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
Revision 31ca07a New Change
 
ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Revision 7e51310 New Change
 
  1. ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java: Loading...
  2. ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java: Loading...
  3. ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java: Loading...