Review Board 1.7.22


Fix for HBASE-5717: output scan metrics even when scanner doesn't run through all available rows.

Review Request #4640 - Created April 4, 2012 and updated

Ian Varley
HBASE-5717
Reviewers
hbase
hbase
Fix for persistence of scan metrics when the scanner doesn't run to exhaustion.
Altered the scan metrics unit test to show this problem (now fails without changes to ClientScanner.java).

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 3. See what's changed.

1 2 3
1 2 3

  1. /src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java: Loading...
  2. /src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java: Loading...
/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
Revision 1309585 New Change
[20] 225 lines
[+20] [+] protected ScannerCallable getScannerCallable(byte [] localStartKey,
226
      s.setCaching(nbRows);
226
      s.setCaching(nbRows);
227
      return s;
227
      return s;
228
    }
228
    }
229

    
   
229

   
230
    /**
230
    /**
231
     * publish the scan metrics
231
     * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
232
     * For now, we use scan.setAttribute to pass the metrics for application
232
     * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
233
     * or TableInputFormat to consume
233
     * framework because it doesn't support multi-instances of the same metrics on the same machine;
234
     * Later, we could push it to other systems
234
     * for scan/map reduce scenarios, we will have multiple scans running at the same time.
235
     * We don't use metrics framework because it doesn't support
235
     * 
236
     * multi instances of the same metrics on the same machine; for scan/map
236
     * By default, scan metrics are disabled; if the application wants to collect them, this behavior
237
     * reduce scenarios, we will have multiple scans running at the same time
237
     * can be turned on by calling calling:

    
   
238
     * 

    
   
239
     * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
238
     */
240
     */
239
    private void writeScanMetrics() throws IOException
241
    private void writeScanMetrics() throws IOException {
240
    {

   
241
      // by default, scanMetrics is null

   
242
      // if application wants to collect scanMetrics, it can turn it on by

   
243
      // calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE,

   
244
      // Bytes.toBytes(Boolean.TRUE))

   
245
      if (this.scanMetrics == null) {
242
      if (this.scanMetrics == null) {
246
        return;
243
        return;
247
      }
244
      }
248
      final DataOutputBuffer d = new DataOutputBuffer();
245
      final DataOutputBuffer d = new DataOutputBuffer();
249
      scanMetrics.write(d);
246
      scanMetrics.write(d);
250
      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
247
      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
251
    }
248
    }
252

    
   
249

   
253
    public Result next() throws IOException {
250
    public Result next() throws IOException {
254
      // If the scanner is closed but there is some rows left in the cache,
251
      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
255
      // it will first empty it before returning null

   
256
      if (cache.size() == 0 && this.closed) {
252
      if (cache.size() == 0 && this.closed) {
257
        writeScanMetrics();

   
258
        return null;
253
        return null;
259
      }
254
      }
260
      if (cache.size() == 0) {
255
      if (cache.size() == 0) {
261
        Result [] values = null;
256
        Result [] values = null;
262
        long remainingResultSize = maxScannerResultSize;
257
        long remainingResultSize = maxScannerResultSize;
[+20] [20] 51 lines
[+20] private void writeScanMetrics() throws IOException private void writeScanMetrics() throws IOException {
314
            this.currentRegion = null;
309
            this.currentRegion = null;
315
            continue;
310
            continue;
316
          }
311
          }
317
          long currentTime = System.currentTimeMillis();
312
          long currentTime = System.currentTimeMillis();
318
          if (this.scanMetrics != null ) {
313
          if (this.scanMetrics != null ) {
319
            this.scanMetrics.sumOfMillisSecBetweenNexts.inc(
314
            this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime-lastNext);
320
              currentTime-lastNext);

   
321
          }
315
          }
322
          lastNext = currentTime;
316
          lastNext = currentTime;
323
          if (values != null && values.length > 0) {
317
          if (values != null && values.length > 0) {
324
            for (Result rs : values) {
318
            for (Result rs : values) {
325
              cache.add(rs);
319
              cache.add(rs);
[+20] [20] 9 lines
[+20] private void writeScanMetrics() throws IOException private void writeScanMetrics() throws IOException {
335
      }
329
      }
336

    
   
330

   
337
      if (cache.size() > 0) {
331
      if (cache.size() > 0) {
338
        return cache.poll();
332
        return cache.poll();
339
      }
333
      }

    
   
334

   

    
   
335
      // if we exhausted this scanner before calling close, write out the scan metrics
340
      writeScanMetrics();
336
      writeScanMetrics();
341
      return null;
337
      return null;
342
    }
338
    }
343

    
   
339

   
344
    /**
340
    /**
[+20] [20] 22 lines
[+20] private void writeScanMetrics() throws IOException private void writeScanMetrics() throws IOException {
367
    public void close() {
363
    public void close() {
368
      if (callable != null) {
364
      if (callable != null) {
369
        callable.setClose();
365
        callable.setClose();
370
        try {
366
        try {
371
          callable.withRetries();
367
          callable.withRetries();

    
   
368
          writeScanMetrics();
372
        } catch (IOException e) {
369
        } catch (IOException e) {
373
          // We used to catch this error, interpret, and rethrow. However, we
370
          // We used to catch this error, interpret, and rethrow. However, we
374
          // have since decided that it's not nice for a scanner's close to
371
          // have since decided that it's not nice for a scanner's close to
375
          // throw exceptions. Chances are it was just an UnknownScanner
372
          // throw exceptions. Chances are it was just an UnknownScanner
376
          // exception due to lease time out.
373
          // exception due to lease time out.
377
        }
374
        }
378
        callable = null;
375
        callable = null;
379
      }
376
      }
380
      closed = true;
377
      closed = true;
381
    }
378
    }
382
}
379
}
/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Revision 1309585 New Change
 
  1. /src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java: Loading...
  2. /src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java: Loading...