Review Board 1.7.22


Change ORC tree readers to return batches of rows instead of a row

Review Request #10712 - Created April 22, 2013 and updated

Sarvesh Sakalanaga
vectorization
HIVE-4370
Reviewers
hive
hive-git
The patch contains changes to ORC reader to return a batch of rows instead of a row. A new method called nextBatch() is added to ORC reader and tree readers of ORC. Currently only int,long,short,double,float,string and struct support batch processing.

 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
Revision 246170d New Change
[20] 15 lines
[+20]
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18

    
   
18

   
19
package org.apache.hadoop.hive.ql.exec.vector;
19
package org.apache.hadoop.hive.ql.exec.vector;
20

    
   
20

   

    
   
21
import org.apache.hadoop.io.Text;
21
import org.apache.hadoop.io.Writable;
22
import org.apache.hadoop.io.Writable;
22

    
   
23

   
23
/**
24
/**
24
 * This class supports string and binary data by value reference -- i.e. each field is 
25
 * This class supports string and binary data by value reference -- i.e. each field is
25
 * explicitly present, as opposed to provided by a dictionary reference.
26
 * explicitly present, as opposed to provided by a dictionary reference.
26
 * In some cases, all the values will be in the same byte array to begin with,
27
 * In some cases, all the values will be in the same byte array to begin with,
27
 * but this need not be the case. If each value is in a separate byte 
28
 * but this need not be the case. If each value is in a separate byte
28
 * array to start with, or not all of the values are in the same original
29
 * array to start with, or not all of the values are in the same original
29
 * byte array, you can still assign data by reference into this column vector.
30
 * byte array, you can still assign data by reference into this column vector.
30
 * This gives flexibility to use this in multiple situations. 
31
 * This gives flexibility to use this in multiple situations.
31
 * <p>
32
 * <p>
32
 * When setting data by reference, the caller
33
 * When setting data by reference, the caller
33
 * is responsible for allocating the byte arrays used to hold the data.
34
 * is responsible for allocating the byte arrays used to hold the data.
34
 * You can also set data by value, as long as you call the initBuffer() method first.
35
 * You can also set data by value, as long as you call the initBuffer() method first.
35
 * You can mix "by value" and "by reference" in the same column vector,
36
 * You can mix "by value" and "by reference" in the same column vector,
36
 * though that use is probably not typical.
37
 * though that use is probably not typical.
37
 */
38
 */
38
public class BytesColumnVector extends ColumnVector {
39
public class BytesColumnVector extends ColumnVector {
39
  public byte[][] vector; 
40
  public byte[][] vector;
40
  public int[] start;          // start offset of each field
41
  public int[] start;          // start offset of each field
41
  
42

   
42
  /*
43
  /*
43
   * The length of each field. If the value repeats for every entry, then it is stored 
44
   * The length of each field. If the value repeats for every entry, then it is stored
44
   * in vector[0] and isRepeating from the superclass is set to true.
45
   * in vector[0] and isRepeating from the superclass is set to true.
45
   */
46
   */
46
  public int[] length; 
47
  public int[] length;
47
  private byte[] buffer;   // optional buffer to use when actually copying in data
48
  private byte[] buffer;   // optional buffer to use when actually copying in data
48
  private int nextFree;    // next free position in buffer
49
  private int nextFree;    // next free position in buffer
49
  
50

   
50
  // Estimate that there will be 16 bytes per entry
51
  // Estimate that there will be 16 bytes per entry
51
  static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
52
  static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
52
  
53

   
53
  // Proportion of extra space to provide when allocating more buffer space. 
54
  // Proportion of extra space to provide when allocating more buffer space.
54
  static final float EXTRA_SPACE_FACTOR = (float) 1.2;
55
  static final float EXTRA_SPACE_FACTOR = (float) 1.2;
55
  
56

   
56
  /**
57
  /**
57
   * Use this constructor for normal operation.
58
   * Use this constructor for normal operation.
58
   * All column vectors should be the default size normally.
59
   * All column vectors should be the default size normally.
59
   */
60
   */
60
  public BytesColumnVector() {
61
  public BytesColumnVector() {
61
    this(VectorizedRowBatch.DEFAULT_SIZE);
62
    this(VectorizedRowBatch.DEFAULT_SIZE);
62
  }
63
  }
63
  
64

   
64
  /**
65
  /**
65
   * Don't call this constructor except for testing purposes.
66
   * Don't call this constructor except for testing purposes.
66
   * 
67
   *
67
   * @param size  number of elements in the column vector
68
   * @param size  number of elements in the column vector
68
   */
69
   */
69
  public BytesColumnVector(int size) {
70
  public BytesColumnVector(int size) {
70
    super(size);
71
    super(size);
71
    vector = new byte[size][];
72
    vector = new byte[size][];
72
    start = new int[size];
73
    start = new int[size];
73
    length = new int[size]; 
74
    length = new int[size];
74
  }
75
  }
75
  
76

   
76
  /** Set a field by reference.
77
  /** Set a field by reference.
77
   *  
78
   *
78
   * @param elementNum index within column vector to set
79
   * @param elementNum index within column vector to set
79
   * @param sourceBuf container of source data
80
   * @param sourceBuf container of source data
80
   * @param start start byte position within source
81
   * @param start start byte position within source
81
   * @param length  length of source byte sequence
82
   * @param length  length of source byte sequence
82
   */
83
   */
83
  public void setRef(int elementNum, byte[] sourceBuf, int start, int length) {
84
  public void setRef(int elementNum, byte[] sourceBuf, int start, int length) {
84
    vector[elementNum] = sourceBuf;
85
    vector[elementNum] = sourceBuf;
85
    this.start[elementNum] = start;
86
    this.start[elementNum] = start;
86
    this.length[elementNum] = length;
87
    this.length[elementNum] = length;
87
  }
88
  }
88
  
89

   
89
  /** 
90
  /**
90
   * You must call initBuffer first before using setVal().
91
   * You must call initBuffer first before using setVal().
91
   * Provide the estimated number of bytes needed to hold
92
   * Provide the estimated number of bytes needed to hold
92
   * a full column vector worth of byte string data.
93
   * a full column vector worth of byte string data.
93
   * 
94
   *
94
   * @param estimatedValueSize  Estimated size of buffer space needed
95
   * @param estimatedValueSize  Estimated size of buffer space needed
95
   */
96
   */
96
  public void initBuffer(int estimatedValueSize) {
97
  public void initBuffer(int estimatedValueSize) {
97
    nextFree = 0;
98
    nextFree = 0;
98
    
99

   
99
    // if buffer is already allocated, keep using it, don't re-allocate
100
    // if buffer is already allocated, keep using it, don't re-allocate
100
    if (buffer != null) {
101
    if (buffer != null) {
101
      return;
102
      return;
102
    }
103
    }
103
    
104

   
104
    // allocate a little extra space to limit need to re-allocate
105
    // allocate a little extra space to limit need to re-allocate
105
    int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
106
    int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
106
    if (bufferSize < DEFAULT_BUFFER_SIZE) {
107
    if (bufferSize < DEFAULT_BUFFER_SIZE) {
107
      bufferSize = DEFAULT_BUFFER_SIZE;
108
      bufferSize = DEFAULT_BUFFER_SIZE;
108
    }
109
    }
109
    buffer = new byte[bufferSize]; 
110
    buffer = new byte[bufferSize];
110
  }
111
  }
111
  
112

   
112
  /**
113
  /**
113
   * Initialize buffer to default size.
114
   * Initialize buffer to default size.
114
   */
115
   */
115
  public void initBuffer() {
116
  public void initBuffer() {
116
    initBuffer(0);
117
    initBuffer(0);
117
  }
118
  }
118
  
119

   
119
  /**
120
  /**
120
   * @return amount of buffer space currently allocated
121
   * @return amount of buffer space currently allocated
121
   */
122
   */
122
  public int bufferSize() {
123
  public int bufferSize() {
123
    if (buffer == null) {
124
    if (buffer == null) {
124
      return 0;
125
      return 0;
125
    }
126
    }
126
    return buffer.length;
127
    return buffer.length;
127
  }
128
  }
128
  
129

   
129
  /**
130
  /**
130
   * Set a field by actually copying in to a local buffer.
131
   * Set a field by actually copying in to a local buffer.
131
   * If you must actually copy data in to the array, use this method.
132
   * If you must actually copy data in to the array, use this method.
132
   * DO NOT USE this method unless it's not practical to set data by reference with setRef().
133
   * DO NOT USE this method unless it's not practical to set data by reference with setRef().
133
   * Setting data by reference tends to run a lot faster than copying data in.
134
   * Setting data by reference tends to run a lot faster than copying data in.
134
   * 
135
   *
135
   * @param elementNum index within column vector to set
136
   * @param elementNum index within column vector to set
136
   * @param sourceBuf container of source data
137
   * @param sourceBuf container of source data
137
   * @param start start byte position within source
138
   * @param start start byte position within source
138
   * @param length  length of source byte sequence
139
   * @param length  length of source byte sequence
139
   */
140
   */
[+20] [20] 5 lines
[+20] [+] public void setVal(int elementNum, byte[] sourceBuf, int start, int length) {
145
    vector[elementNum] = buffer;
146
    vector[elementNum] = buffer;
146
    this.start[elementNum] = nextFree;
147
    this.start[elementNum] = nextFree;
147
    this.length[elementNum] = length;
148
    this.length[elementNum] = length;
148
    nextFree += length;
149
    nextFree += length;
149
  }
150
  }
150
  
151

   
151
  /**
152
  /**
152
   * Increase buffer space enough to accommodate next element.
153
   * Increase buffer space enough to accommodate next element.
153
   * This uses an exponential increase mechanism to rapidly 
154
   * This uses an exponential increase mechanism to rapidly
154
   * increase buffer size to enough to hold all data.
155
   * increase buffer size to enough to hold all data.
155
   * As batches get re-loaded, buffer space allocated will quickly
156
   * As batches get re-loaded, buffer space allocated will quickly
156
   * stabilize.
157
   * stabilize.
157
   * 
158
   *
158
   * @param nextElemLength size of next element to be added
159
   * @param nextElemLength size of next element to be added
159
   */
160
   */
160
  public void increaseBufferSpace(int nextElemLength) {
161
  public void increaseBufferSpace(int nextElemLength) {
161
    
162

   
162
    // Keep doubling buffer size until there will be enough space for next element.
163
    // Keep doubling buffer size until there will be enough space for next element.
163
    int newLength = 2 * buffer.length; 
164
    int newLength = 2 * buffer.length;
164
    while((nextFree + nextElemLength) > newLength) {
165
    while((nextFree + nextElemLength) > newLength) {
165
      newLength *= 2;
166
      newLength *= 2;
166
    }
167
    }
167
    
168

   
168
    // Allocate new buffer, copy data to it, and set buffer to new buffer.
169
    // Allocate new buffer, copy data to it, and set buffer to new buffer.
169
    byte[] newBuffer = new byte[newLength];
170
    byte[] newBuffer = new byte[newLength];
170
    System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
171
    System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
171
    buffer = newBuffer;
172
    buffer = newBuffer;
172
  }
173
  }
173

    
   
174

   
174
  @Override
175
  @Override
175
  public Writable getWritableObject(int index) {
176
  public Writable getWritableObject(int index) {
176
    
177
    Text result = null;
177
    // TODO finish this
178
    if (!isNull[index]) {
178
    throw new UnsupportedOperationException("unfinished");
179
      result = new Text();

    
   
180
      result.append(vector[index], start[index], length[index]);

    
   
181
    }

    
   
182
    return result;
179
  }
183
  }
180
  

   
181
}
184
}
ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
Revision fc4e53b New Change
 
ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
Revision 05240ce New Change
 
ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
Revision d044cd8 New Change
 
ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
Revision 2825c64 New Change
 
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
New File
 
  1. ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java: Loading...
  2. ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java: Loading...
  3. ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java: Loading...
  4. ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java: Loading...
  5. ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java: Loading...
  6. ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java: Loading...