Review Board 1.7.22


HIVE-4606 Add memory pressure flush for VectorGroupByOperator

Review Request #11747 - Created June 8, 2013 and submitted

Remus Rusanu
Vectorization
HIVE-4606
Reviewers
hive
hanson5b, jitendra
hive-git
Implement the flush under memory pressure, modeled much after the GroupByOperator implementation.

 

Diff revision 2 (Latest)

1 2
1 2

  1. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java: Loading...
  2. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java: Loading...
  3. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java: Loading...
  4. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java: Loading...
  5. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java: Loading...
  6. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java: Loading...
  7. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java: Loading...
  8. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java: Loading...
  9. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java: Loading...
  10. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java: Loading...
  11. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java: Loading...
  12. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java: Loading...
  13. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java: Loading...
  14. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java: Loading...
  15. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java: Loading...
  16. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java: Loading...
  17. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java: Loading...
  18. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java: Loading...
  19. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java: Loading...
  20. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java: Loading...
This diff has been split across 2 pages: 1 2 >
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
Revision 030a73c New Change
[20] 15 lines
[+20]
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18

    
   
18

   
19
package org.apache.hadoop.hive.ql.exec.vector;
19
package org.apache.hadoop.hive.ql.exec.vector;
20

    
   
20

   

    
   
21
import java.util.Arrays;

    
   
22

   

    
   
23
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;

    
   
24
import org.apache.hadoop.hive.ql.util.JavaDataModel;

    
   
25

   
21
/**
26
/**
22
 * This maps a batch to the aggregation buffers sets to use for each row (key)  
27
 * This maps a batch to the aggregation buffers sets to use for each row (key)
23
 *
28
 *
24
 */
29
 */
25
public class VectorAggregationBufferBatch {
30
public class VectorAggregationBufferBatch {
26
  
31

   
27
  /**
32
  /**
28
   * Batch sized array of aggregation buffer sets. 
33
   * Batch sized array of aggregation buffer sets.
29
   * The array is preallocated and is reused for each batch, but the individual entries
34
   * The array is preallocated and is reused for each batch, but the individual entries
30
   * will reference different aggregation buffer set from batch to batch.
35
   * will reference different aggregation buffer set from batch to batch.
31
   * the array is not reset between batches, content past this.index will be stale.
36
   * the array is not reset between batches, content past this.index will be stale.
32
   */
37
   */
33
  private VectorAggregationBufferRow[] aggregationBuffers;
38
  private final VectorAggregationBufferRow[] aggregationBuffers;
34
  
39

   
35
  /**
40
  /**
36
   * the selection vector that maps row within a batch to the 
41
   * Same as aggregationBuffers but only distinct buffers
37
   * specific aggregation buffer set to use. 

   
38
   */
42
   */
39
  private int[] selection;
43
  private final VectorAggregationBufferRow[] distinctAggregationBuffers;
40
  
44

   
41
  /**
45
  /**
42
   * versioning number gets incremented on each batch. This allows us to cache the selection
46
   * versioning number gets incremented on each batch. This allows us to cache the selection
43
   * mapping info in the aggregation buffer set themselves while still being able to 
47
   * mapping info in the aggregation buffer set themselves while still being able to
44
   * detect stale info.
48
   * detect stale info.
45
   */
49
   */
46
  private int version;
50
  private int version;
47
  
51

   
48
  /**
52
  /**
49
   * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch.
53
   * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch.
50
   */
54
   */
51
  private int distinctCount;
55
  private int distinctCount;
52
  
56

   

    
   
57
  /**

    
   
58
   * Memory consumed by a set of aggregation buffers

    
   
59
   */

    
   
60
  private int aggregatorsFixedSize;

    
   
61

   

    
   
62
  /**

    
   
63
   * Array of indexes for aggregators that have variable size

    
   
64
   */

    
   
65
  private int[] variableSizeAggregators;;

    
   
66

   

    
   
67
  /**

    
   
68
   * returns True if any of the aggregators has a variable size
Moved from 74

    
   
69
   * @return
Moved from 75

    
   
70
   */

    
   
71
  public boolean getHasVariableSize() {

    
   
72
    return variableSizeAggregators.length > 0;

    
   
73
  }

    
   
74

   

    
   
75
  /**

    
   
76
   * Returns the fixed size consumed by the aggregation buffers
Moved from 74

    
   
77
   * @return
Moved from 75

    
   
78
   */

    
   
79
  public int getAggregatorsFixedSize() {

    
   
80
    return aggregatorsFixedSize;

    
   
81
  }

    
   
82

   
53
  /**
83
  /**
54
   * the array of aggregation buffers for the current batch.
84
   * the array of aggregation buffers for the current batch.
55
   * content past the {@link #getDistinctBufferSetCount()} index
85
   * content past the {@link #getDistinctBufferSetCount()} index
56
   * is stale from previous batches.
86
   * is stale from previous batches.
57
   * @return
87
   * @return
58
   */
88
   */
59
  public VectorAggregationBufferRow[] getAggregationBuffers() {
89
  public VectorAggregationBufferRow[] getAggregationBuffers() {
60
    return aggregationBuffers;
90
    return aggregationBuffers;
61
  }
91
  }
62
  
92

   
63
  /**
93
  /**
64
   * number of distinct aggregation buffer sets (ie. keys) in the current batch. 
94
   * number of distinct aggregation buffer sets (ie. keys) in the current batch.
65
   * @return
95
   * @return
66
   */
96
   */
67
  public int getDistinctBufferSetCount () {
97
  public int getDistinctBufferSetCount () {
68
    return distinctCount;
98
    return distinctCount;
69
  }
99
  }
70

    
   
100

   
71
  /**
101

   
72
   * gets the selection vector to use for the current batch. This maps the batch rows by position 

   
73
   * (row number) to an index in the {@link #getAggregationBuffers()} array.

   
74
   * @return
Moved to 77

   
75
   */
Moved to 78

   
76
  public int[] getSelectionVector() {

   
77
    return selection;

   
78
  }

   
79
  

   
80
  public VectorAggregationBufferBatch() {
102
  public VectorAggregationBufferBatch() {
81
    aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
103
    aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
82
    selection = new int [VectorizedRowBatch.DEFAULT_SIZE];
104
    distinctAggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
83
  }
105
  }
84
  
106

   
85
  /**
107
  /**
86
   * resets the internal aggregation buffers sets index and increments the versioning 
108
   * resets the internal aggregation buffers sets index and increments the versioning
87
   * used to optimize the selection vector population.
109
   * used to optimize the selection vector population.
88
   */
110
   */
89
  public void startBatch() {
111
  public void startBatch() {
90
    version++;
112
    version++;
91
    distinctCount = 0;
113
    distinctCount = 0;
92
  }
114
  }
93
    
115

   
94
  /**
116
  /**
95
   * assigns the given aggregation buffer set to a given batch row (by row number).
117
   * assigns the given aggregation buffer set to a given batch row (by row number).
96
   * populates the selection vector appropriately. This is where the versioning numbers
118
   * populates the selection vector appropriately. This is where the versioning numbers
97
   * play a role in determining if the index cached on the aggregation buffer set is stale. 
119
   * play a role in determining if the index cached on the aggregation buffer set is stale.
98
   */
120
   */
99
  public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) {
121
  public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) {
100
    if (version != bufferSet.getVersion()) {
122
    if (version != bufferSet.getVersion()) {
101
      bufferSet.setVersionAndIndex(version, distinctCount);
123
      bufferSet.setVersionAndIndex(version, distinctCount);

    
   
124
      distinctAggregationBuffers[distinctCount] = bufferSet;
102
      ++distinctCount;
125
      ++distinctCount;
103
    }
126
    }
104
    aggregationBuffers[row] = bufferSet;
127
    aggregationBuffers[row] = bufferSet;
105
  }
128
  }
106

    
   
129

   

    
   
130
  public void compileAggregationBatchInfo(VectorAggregateExpression[] aggregators) {

    
   
131
    JavaDataModel model = JavaDataModel.get();

    
   
132
    int[] variableSizeAggregators = new int[aggregators.length];

    
   
133
    int indexVariableSizes = 0;

    
   
134

   

    
   
135
    aggregatorsFixedSize = JavaDataModel.alignUp(

    
   
136
        model.object() +

    
   
137
        model.primitive1()*2 +

    
   
138
        model.ref(),

    
   
139
        model.memoryAlign());

    
   
140

   

    
   
141
    aggregatorsFixedSize += model.lengthForObjectArrayOfSize(aggregators.length);

    
   
142
    for(int i=0;i<aggregators.length;++i) {

    
   
143
      VectorAggregateExpression aggregator = aggregators[i];

    
   
144
      aggregatorsFixedSize += aggregator.getAggregationBufferFixedSize();

    
   
145
      if (aggregator.hasVariableSize()) {

    
   
146
        variableSizeAggregators[indexVariableSizes] = i;

    
   
147
        ++indexVariableSizes;

    
   
148
      }

    
   
149
    }

    
   
150
    this.variableSizeAggregators = Arrays.copyOfRange(

    
   
151
        variableSizeAggregators, 0, indexVariableSizes);

    
   
152
  }

    
   
153

   

    
   
154
  public int getVariableSize(int batchSize) {

    
   
155
    int variableSize = 0;

    
   
156
    for (int i=0; i< variableSizeAggregators.length; ++i) {

    
   
157
      for(int r=0; r<distinctCount; ++r) {

    
   
158
         VectorAggregationBufferRow buf = distinctAggregationBuffers[r];

    
   
159
         variableSize += buf.getAggregationBuffer(variableSizeAggregators[i]).getVariableSize();

    
   
160
      }

    
   
161
    }

    
   
162
    return (variableSize * batchSize)/distinctCount;

    
   
163
  }

    
   
164

   
107
}
165
}
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
Revision 4634731 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
Revision 01dd7be New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
Revision 5e547c7 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
Revision 8ab9f43 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
Revision bb4800e New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
Revision 607e3ad New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
Revision 432b12e New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
Revision b282aeb New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java
Revision bc7f852 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java
Revision 6ba416e New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java
Revision c5e92b6 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java
Revision d982fc2 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java
Revision a8f5531 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java
Revision ca1b840 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java
Revision 4d408b3 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java
Revision 7e16a1c New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java
Revision 00cd04d New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java
Revision ea4d894 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java
Revision e609a19 New Change
 
  1. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java: Loading...
  2. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java: Loading...
  3. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java: Loading...
  4. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java: Loading...
  5. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java: Loading...
  6. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java: Loading...
  7. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java: Loading...
  8. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java: Loading...
  9. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java: Loading...
  10. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java: Loading...
  11. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java: Loading...
  12. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java: Loading...
  13. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java: Loading...
  14. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java: Loading...
  15. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java: Loading...
  16. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java: Loading...
  17. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java: Loading...
  18. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java: Loading...
  19. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java: Loading...
  20. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java: Loading...
This diff has been split across 2 pages: 1 2 >