Review Board 1.7.22


Remove .array() assumptions from ORC RecordReaderImpl and InStream

Review Request #15097 - Created Oct. 30, 2013 and updated

Gopal V
trunk
HIVE-5663
Reviewers
hive
omalley
hive-git
This patch scrubs all ByteBuffer.wrap assumptions from the RecordReaderImpl and InStream classes.

The code still uses wrapped bytebuffers in the read path, but accesses all data using the ByteBuffer methods without resorting to .array() data.
tested with TestORCFile, -Dtestcase=TestCliDriver -Dqfile_regex=.*orc.*
ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
Revision 3164a19 New Change
[20] 26 lines
[+20]
27
    private final String name;
27
    private final String name;
28
    private final ByteBuffer[] bytes;
28
    private final ByteBuffer[] bytes;
29
    private final long[] offsets;
29
    private final long[] offsets;
30
    private final long length;
30
    private final long length;
31
    private long currentOffset;
31
    private long currentOffset;
32
    private byte[] range;
32
    private ByteBuffer range;
33
    private int currentRange;
33
    private int currentRange;
34
    private int offsetInRange;

   
35
    private int limitInRange;

   
36

    
   
34

   
37
    public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
35
    public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
38
                              long length) {
36
                              long length) {
39
      this.name = name;
37
      this.name = name;
40
      this.bytes = input;
38
      this.bytes = input;
41
      this.offsets = offsets;
39
      this.offsets = offsets;
42
      this.length = length;
40
      this.length = length;
43
      currentRange = 0;
41
      currentRange = 0;
44
      offsetInRange = 0;

   
45
      limitInRange = 0;

   
46
      currentOffset = 0;
42
      currentOffset = 0;
47
    }
43
    }
48

    
   
44

   
49
    @Override
45
    @Override
50
    public int read() {
46
    public int read() {
51
      if (offsetInRange >= limitInRange) {
47
      if (range == null || range.remaining() == 0) {
52
        if (currentOffset == length) {
48
        if (currentOffset == length) {
53
          return -1;
49
          return -1;
54
        }
50
        }
55
        seek(currentOffset);
51
        seek(currentOffset);
56
      }
52
      }
57
      currentOffset += 1;
53
      currentOffset += 1;
58
      return 0xff & range[offsetInRange++];
54
      return 0xff & range.get();
59
    }
55
    }
60

    
   
56

   
61
    @Override
57
    @Override
62
    public int read(byte[] data, int offset, int length) {
58
    public int read(byte[] data, int offset, int length) {
63
      if (offsetInRange >= limitInRange) {
59
      if (range == null || range.remaining() == 0) {
64
        if (currentOffset == this.length) {
60
        if (currentOffset == this.length) {
65
          return -1;
61
          return -1;
66
        }
62
        }
67
        seek(currentOffset);
63
        seek(currentOffset);
68
      }
64
      }
69
      int actualLength = Math.min(length, limitInRange - offsetInRange);
65
      int actualLength = Math.min(length, range.remaining());
70
      System.arraycopy(range, offsetInRange, data, offset, actualLength);
66
      range.get(data, offset, actualLength);
71
      offsetInRange += actualLength;

   
72
      currentOffset += actualLength;
67
      currentOffset += actualLength;
73
      return actualLength;
68
      return actualLength;
74
    }
69
    }
75

    
   
70

   
76
    @Override
71
    @Override
77
    public int available() {
72
    public int available() {
78
      if (offsetInRange < limitInRange) {
73
      if (range != null && range.remaining() > 0) {
79
        return limitInRange - offsetInRange;
74
        return range.remaining();
80
      }
75
      }
81
      return (int) (length - currentOffset);
76
      return (int) (length - currentOffset);
82
    }
77
    }
83

    
   
78

   
84
    @Override
79
    @Override
85
    public void close() {
80
    public void close() {
86
      currentRange = bytes.length;
81
      currentRange = bytes.length;
87
      currentOffset = length;
82
      currentOffset = length;

    
   
83
      // explicit de-ref of bytes[]

    
   
84
      for(int i = 0; i < bytes.length; i++) {

    
   
85
        bytes[i] = null;

    
   
86
      }
88
    }
87
    }
89

    
   
88

   
90
    @Override
89
    @Override
91
    public void seek(PositionProvider index) throws IOException {
90
    public void seek(PositionProvider index) throws IOException {
92
      seek(index.getNext());
91
      seek(index.getNext());
93
    }
92
    }
94

    
   
93

   
95
    public void seek(long desired) {
94
    public void seek(long desired) {
96
      for(int i = 0; i < bytes.length; ++i) {
95
      for(int i = 0; i < bytes.length; ++i) {
97
        if (offsets[i] <= desired &&
96
        if (offsets[i] <= desired &&
98
            desired - offsets[i] < bytes[i].remaining()) {
97
            desired - offsets[i] < bytes[i].remaining()) {
99
          currentOffset = desired;
98
          currentOffset = desired;
100
          currentRange = i;
99
          currentRange = i;
101
          this.range = bytes[i].array();
100
          this.range = bytes[i].duplicate();
102
          offsetInRange = bytes[i].arrayOffset() + bytes[i].position();
101
          int pos = range.position();
103
          limitInRange = bytes[i].arrayOffset() + bytes[i].limit();
102
          pos += (int)(desired - offsets[i]); // this is why we duplicate
104
          offsetInRange += desired - offsets[i];
103
          this.range.position(pos);
105
          return;
104
          return;
106
        }
105
        }
107
      }
106
      }
108
      throw new IllegalArgumentException("Seek in " + name + " to " +
107
      throw new IllegalArgumentException("Seek in " + name + " to " +
109
        desired + " is outside of the data");
108
        desired + " is outside of the data");
110
    }
109
    }
111

    
   
110

   
112
    @Override
111
    @Override
113
    public String toString() {
112
    public String toString() {
114
      return "uncompressed stream " + name + " position: " + currentOffset +
113
      return "uncompressed stream " + name + " position: " + currentOffset +
115
          " length: " + length + " range: " + currentRange +
114
          " length: " + length + " range: " + currentRange +
116
          " offset: " + offsetInRange + " limit: " + limitInRange;
115
          " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
117
    }
116
    }
118
  }
117
  }
119

    
   
118

   
120
  private static class CompressedStream extends InStream {
119
  private static class CompressedStream extends InStream {
121
    private final String name;
120
    private final String name;
122
    private final ByteBuffer[] bytes;
121
    private final ByteBuffer[] bytes;
123
    private final long[] offsets;
122
    private final long[] offsets;
124
    private final int bufferSize;
123
    private final int bufferSize;
125
    private final long length;
124
    private final long length;
126
    private ByteBuffer uncompressed = null;
125
    private ByteBuffer uncompressed;
127
    private final CompressionCodec codec;
126
    private final CompressionCodec codec;
128
    private byte[] compressed = null;
127
    private ByteBuffer compressed;
129
    private long currentOffset;
128
    private long currentOffset;
130
    private int currentRange;
129
    private int currentRange;
131
    private int offsetInCompressed;

   
132
    private int limitInCompressed;

   
133
    private boolean isUncompressedOriginal;
130
    private boolean isUncompressedOriginal;

    
   
131
    private boolean isDirect = false;
134

    
   
132

   
135
    public CompressedStream(String name, ByteBuffer[] input,
133
    public CompressedStream(String name, ByteBuffer[] input,
136
                            long[] offsets, long length,
134
                            long[] offsets, long length,
137
                            CompressionCodec codec, int bufferSize
135
                            CompressionCodec codec, int bufferSize
138
                           ) {
136
                           ) {
139
      this.bytes = input;
137
      this.bytes = input;
140
      this.name = name;
138
      this.name = name;
141
      this.codec = codec;
139
      this.codec = codec;
142
      this.length = length;
140
      this.length = length;

    
   
141
      if(this.length > 0) {

    
   
142
        isDirect = this.bytes[0].isDirect();

    
   
143
      }
143
      this.offsets = offsets;
144
      this.offsets = offsets;
144
      this.bufferSize = bufferSize;
145
      this.bufferSize = bufferSize;
145
      currentOffset = 0;
146
      currentOffset = 0;
146
      currentRange = 0;
147
      currentRange = 0;
147
      offsetInCompressed = 0;

   
148
      limitInCompressed = 0;

   
149
    }
148
    }
150

    
   
149

   

    
   
150
    private ByteBuffer allocateBuffer(int size) {

    
   
151
      // TODO: use the same pool as the ORC readers

    
   
152
      if(isDirect == true) {

    
   
153
        return ByteBuffer.allocateDirect(size);

    
   
154
      } else {

    
   
155
        return ByteBuffer.allocate(size);

    
   
156
      }

    
   
157
    }

    
   
158

   
151
    private void readHeader() throws IOException {
159
    private void readHeader() throws IOException {
152
      if (compressed == null || offsetInCompressed >= limitInCompressed) {
160
      if (compressed == null || compressed.remaining() <= 0) {
153
        seek(currentOffset);
161
        seek(currentOffset);
154
      }
162
      }
155
      if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) {
163
      if (compressed.remaining() > OutStream.HEADER_SIZE) {
156
        int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) |
164
        int b0 = compressed.get() & 0xff;
157
          ((0xff & compressed[offsetInCompressed + 1]) << 7) |
165
        int b1 = compressed.get() & 0xff;
158
            ((0xff & compressed[offsetInCompressed]) >> 1);
166
        int b2 = compressed.get() & 0xff;

    
   
167
        boolean isOriginal = (b0 & 0x01) == 1;

    
   
168
        int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);

    
   
169

   
159
        if (chunkLength > bufferSize) {
170
        if (chunkLength > bufferSize) {
160
          throw new IllegalArgumentException("Buffer size too small. size = " +
171
          throw new IllegalArgumentException("Buffer size too small. size = " +
161
              bufferSize + " needed = " + chunkLength);
172
              bufferSize + " needed = " + chunkLength);
162
        }
173
        }
163
        boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1;
174
        // read 3 bytes already
164
        offsetInCompressed += OutStream.HEADER_SIZE;
175
        if(OutStream.HEADER_SIZE > 3) {

    
   
176
          compressed.position(compressed.position()+(OutStream.HEADER_SIZE-3));

    
   
177
        }

    
   
178
        currentOffset += OutStream.HEADER_SIZE;

    
   
179

   

    
   
180
        ByteBuffer slice = this.slice(chunkLength);

    
   
181

   
165
        if (isOriginal) {
182
        if (isOriginal) {

    
   
183
          uncompressed = slice;
166
          isUncompressedOriginal = true;
184
          isUncompressedOriginal = true;
167
          uncompressed = bytes[currentRange].duplicate();

   
168
          uncompressed.position(offsetInCompressed -

   
169
              bytes[currentRange].arrayOffset());

   
170
          uncompressed.limit(offsetInCompressed + chunkLength);

   
171
        } else {
185
        } else {
172
          if (isUncompressedOriginal) {
186
          if (isUncompressedOriginal) {
173
            uncompressed = ByteBuffer.allocate(bufferSize);
187
            uncompressed = allocateBuffer(bufferSize);
174
            isUncompressedOriginal = false;
188
            isUncompressedOriginal = false;
175
          } else if (uncompressed == null) {
189
          } else if (uncompressed == null) {
176
            uncompressed = ByteBuffer.allocate(bufferSize);
190
            uncompressed = allocateBuffer(bufferSize);
177
          } else {
191
          } else {
178
            uncompressed.clear();
192
            uncompressed.clear();
179
          }
193
          }
180
          codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed,
194
          codec.decompress(slice, uncompressed);
181
              chunkLength),

   
182
            uncompressed);

   
183
        }
195
        }
184
        offsetInCompressed += chunkLength;

   
185
        currentOffset += chunkLength + OutStream.HEADER_SIZE;

   
186
      } else {
196
      } else {
187
        throw new IllegalStateException("Can't read header at " + this);
197
        throw new IllegalStateException("Can't read header at " + this);
188
      }
198
      }
189
    }
199
    }
190

    
   
200

   
[+20] [20] 15 lines
[+20] [+] public int read(byte[] data, int offset, int length) throws IOException {
206
          return -1;
216
          return -1;
207
        }
217
        }
208
        readHeader();
218
        readHeader();
209
      }
219
      }
210
      int actualLength = Math.min(length, uncompressed.remaining());
220
      int actualLength = Math.min(length, uncompressed.remaining());
211
      System.arraycopy(uncompressed.array(),
221
      uncompressed.get(data, offset, actualLength);
212
        uncompressed.arrayOffset() + uncompressed.position(), data,

   
213
        offset, actualLength);

   
214
      uncompressed.position(uncompressed.position() + actualLength);

   
215
      return actualLength;
222
      return actualLength;
216
    }
223
    }
217

    
   
224

   
218
    @Override
225
    @Override
219
    public int available() throws IOException {
226
    public int available() throws IOException {
[+20] [20] 7 lines
[+20] public int read(byte[] data, int offset, int length) throws IOException {
227
    }
234
    }
228

    
   
235

   
229
    @Override
236
    @Override
230
    public void close() {
237
    public void close() {
231
      uncompressed = null;
238
      uncompressed = null;

    
   
239
      compressed = null;
232
      currentRange = bytes.length;
240
      currentRange = bytes.length;
233
      offsetInCompressed = 0;

   
234
      limitInCompressed = 0;

   
235
      currentOffset = length;
241
      currentOffset = length;

    
   
242
      for(int i = 0; i < bytes.length; i++) {

    
   
243
        bytes[i] = null;

    
   
244
      }
236
    }
245
    }
237

    
   
246

   
238
    @Override
247
    @Override
239
    public void seek(PositionProvider index) throws IOException {
248
    public void seek(PositionProvider index) throws IOException {
240
      seek(index.getNext());
249
      seek(index.getNext());
[+20] [20] 6 lines
[+20] public void seek(PositionProvider index) throws IOException {
247
        // mark the uncompressed buffer as done
256
        // mark the uncompressed buffer as done
248
        uncompressed.position(uncompressed.limit());
257
        uncompressed.position(uncompressed.limit());
249
      }
258
      }
250
    }
259
    }
251

    
   
260

   

    
   
261
    /* slices a read only contigous buffer of chunkLength */

    
   
262
    private ByteBuffer slice(int chunkLength) throws IOException {

    
   
263
      int len = chunkLength;

    
   
264
      final long oldOffset = currentOffset;

    
   
265
      ByteBuffer slice;

    
   
266
      if (compressed.remaining() >= len) {

    
   
267
        slice = compressed.slice();

    
   
268
        // simple case

    
   
269
        slice.limit(len);

    
   
270
        currentOffset += len;

    
   
271
        compressed.position(compressed.position() + len);

    
   
272
        return slice;

    
   
273
      } else if (currentRange >= (bytes.length - 1)) {

    
   
274
        // nothing has been modified yet

    
   
275
        throw new IOException("EOF in " + this + " while trying to read " +

    
   
276
            chunkLength + " bytes");

    
   
277
      }

    
   
278

   

    
   
279
      // we need to consolidate 2 or more buffers into 1

    
   
280
      // first clear out compressed buffers

    
   
281
      ByteBuffer copy = allocateBuffer(chunkLength);

    
   
282
      currentOffset += compressed.remaining();

    
   
283
      len -= compressed.remaining();

    
   
284
      copy.put(compressed);

    
   
285

   

    
   
286
      while (len > 0 && (++currentRange) < bytes.length) {

    
   
287
        compressed = bytes[currentRange].duplicate();

    
   
288
        if (compressed.remaining() >= len) {

    
   
289
          slice = compressed.slice();

    
   
290
          slice.limit(len);

    
   
291
          copy.put(slice);

    
   
292
          currentOffset += len;

    
   
293
          compressed.position(compressed.position() + len);

    
   
294
          return copy;

    
   
295
        }

    
   
296
        currentOffset += compressed.remaining();

    
   
297
        len -= compressed.remaining();

    
   
298
        copy.put(compressed);

    
   
299
      }

    
   
300

   

    
   
301
      // restore offsets for exception clarity

    
   
302
      seek(oldOffset);

    
   
303
      throw new IOException("EOF in " + this + " while trying to read " +

    
   
304
          chunkLength + " bytes");

    
   
305
    }

    
   
306

   
252
    private void seek(long desired) throws IOException {
307
    private void seek(long desired) throws IOException {
253
      for(int i = 0; i < bytes.length; ++i) {
308
      for(int i = 0; i < bytes.length; ++i) {
254
        if (offsets[i] <= desired &&
309
        if (offsets[i] <= desired &&
255
            desired - offsets[i] < bytes[i].remaining()) {
310
            desired - offsets[i] < bytes[i].remaining()) {
256
          currentRange = i;
311
          currentRange = i;
257
          compressed = bytes[i].array();
312
          compressed = bytes[i].duplicate();
258
          offsetInCompressed = (int) (bytes[i].arrayOffset() +
313
          int pos = compressed.position();
259
              bytes[i].position() + (desired - offsets[i]));
314
          pos += (int)(desired - offsets[i]);

    
   
315
          compressed.position(pos);
260
          currentOffset = desired;
316
          currentOffset = desired;
261
          limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit();

   
262
          return;
317
          return;
263
        }
318
        }
264
      }
319
      }
265
      // if they are seeking to the precise end, go ahead and let them go there
320
      // if they are seeking to the precise end, go ahead and let them go there
266
      int segments = bytes.length;
321
      int segments = bytes.length;
267
      if (segments != 0 &&
322
      if (segments != 0 &&
268
          desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
323
          desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
269
        currentRange = segments - 1;
324
        currentRange = segments - 1;
270
        compressed = bytes[currentRange].array();
325
        compressed = bytes[currentRange].duplicate();
271
        offsetInCompressed = bytes[currentRange].arrayOffset() +
326
        compressed.position(compressed.limit());
272
          bytes[currentRange].limit();

   
273
        currentOffset = desired;
327
        currentOffset = desired;
274
        limitInCompressed = offsetInCompressed;

   
275
        return;
328
        return;
276
      }
329
      }
277
      throw new IOException("Seek outside of data in " + this + " to " +
330
      throw new IOException("Seek outside of data in " + this + " to " +
278
        desired);
331
        desired);
279
    }
332
    }
[+20] [20] 12 lines
[+20] [+] private String rangeString() {
292

    
   
345

   
293
    @Override
346
    @Override
294
    public String toString() {
347
    public String toString() {
295
      return "compressed stream " + name + " position: " + currentOffset +
348
      return "compressed stream " + name + " position: " + currentOffset +
296
          " length: " + length + " range: " + currentRange +
349
          " length: " + length + " range: " + currentRange +
297
          " offset: " + offsetInCompressed + " limit: " + limitInCompressed +
350
          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
298
          rangeString() +
351
          rangeString() +
299
          (uncompressed == null ? "" :
352
          (uncompressed == null ? "" :
300
              " uncompressed: " + uncompressed.position() + " to " +
353
              " uncompressed: " + uncompressed.position() + " to " +
301
                  uncompressed.limit());
354
                  uncompressed.limit());
302
    }
355
    }
[+20] [20] 30 lines
ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
Revision 71484a3 New Change
 
  1. ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java: Loading...
  2. ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java: Loading...