Review Board 1.7.22


HBASE-4145 Provide metrics for hbase client

Review Request #1674 - Created Aug. 29, 2011 and updated

Ming Ma
trunk
hbase-4145
Reviewers
hbase
hbase
1. Collect client-side scan related metrics during scan operation. It is turned off by default.
2. TableInputFormat enables metrics collection on scan and pass the data to mapreduce framework. It only works with new mapreduce APIs that allow TableInputFormat to get access to mapreduce Counter.
3. Clean up some minor issues in tableInputFormat as well as test code.
1. Verified on a small cluster.
2. Existing unit tests.
3. Added new tests.
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Revision 1176942 New Change
1
/**
1
/**
2
 * Copyright 2010 The Apache Software Foundation
2
 * Copyright 2011 The Apache Software Foundation
3
 *
3
 *
4
 * Licensed to the Apache Software Foundation (ASF) under one
4
 * Licensed to the Apache Software Foundation (ASF) under one
5
 * or more contributor license agreements.  See the NOTICE file
5
 * or more contributor license agreements.  See the NOTICE file
6
 * distributed with this work for additional information
6
 * distributed with this work for additional information
7
 * regarding copyright ownership.  The ASF licenses this file
7
 * regarding copyright ownership.  The ASF licenses this file
[+20] [20] 46 lines
[+20]
54
import org.apache.hadoop.hbase.ServerName;
54
import org.apache.hadoop.hbase.ServerName;
55
import org.apache.hadoop.hbase.UnknownScannerException;
55
import org.apache.hadoop.hbase.UnknownScannerException;
56
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
56
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
57
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
57
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
58
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
58
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;

    
   
59
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
59
import org.apache.hadoop.hbase.client.coprocessor.Batch;
60
import org.apache.hadoop.hbase.client.coprocessor.Batch;
60
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
61
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
61
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
62
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
62
import org.apache.hadoop.hbase.util.Addressing;
63
import org.apache.hadoop.hbase.util.Addressing;
63
import org.apache.hadoop.hbase.util.Bytes;
64
import org.apache.hadoop.hbase.util.Bytes;
64
import org.apache.hadoop.hbase.util.Pair;
65
import org.apache.hadoop.hbase.util.Pair;
65
import org.apache.hadoop.hbase.util.Writables;
66
import org.apache.hadoop.hbase.util.Writables;

    
   
67
import org.apache.hadoop.io.DataOutputBuffer;
66

    
   
68

   
67
/**
69
/**
68
 * <p>Used to communicate with a single HBase table.
70
 * <p>Used to communicate with a single HBase table.
69
 *
71
 *
70
 * <p>This class is not thread safe for reads nor write.
72
 * <p>This class is not thread safe for reads nor write.
[+20] [20] 961 lines
[+20] [+] protected class ClientScanner implements ResultScanner {
1032
    private final LinkedList<Result> cache = new LinkedList<Result>();
1034
    private final LinkedList<Result> cache = new LinkedList<Result>();
1033
    private final int caching;
1035
    private final int caching;
1034
    private long lastNext;
1036
    private long lastNext;
1035
    // Keep lastResult returned successfully in case we have to reset scanner.
1037
    // Keep lastResult returned successfully in case we have to reset scanner.
1036
    private Result lastResult = null;
1038
    private Result lastResult = null;

    
   
1039
    private ScanMetrics scanMetrics = null;
1037

    
   
1040

   
1038
    protected ClientScanner(final Scan scan) {
1041
    protected ClientScanner(final Scan scan) {
1039
      if (CLIENT_LOG.isDebugEnabled()) {
1042
      if (CLIENT_LOG.isDebugEnabled()) {
1040
        CLIENT_LOG.debug("Creating scanner over "
1043
        CLIENT_LOG.debug("Creating scanner over "
1041
            + Bytes.toString(getTableName())
1044
            + Bytes.toString(getTableName())
1042
            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
1045
            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
1043
      }
1046
      }
1044
      this.scan = scan;
1047
      this.scan = scan;
1045
      this.lastNext = System.currentTimeMillis();
1048
      this.lastNext = System.currentTimeMillis();
1046

    
   
1049

   

    
   
1050
      // check if application wants to collect scan metrics

    
   
1051
      byte[] enableMetrics = scan.getAttribute(

    
   
1052
        Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);

    
   
1053
      if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {

    
   
1054
        scanMetrics = new ScanMetrics();

    
   
1055
      }

    
   
1056

   
1047
      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
1057
      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
1048
      if (this.scan.getCaching() > 0) {
1058
      if (this.scan.getCaching() > 0) {
1049
        this.caching = this.scan.getCaching();
1059
        this.caching = this.scan.getCaching();
1050
      } else {
1060
      } else {
1051
        this.caching = HTable.this.scannerCaching;
1061
        this.caching = HTable.this.scannerCaching;
[+20] [20] 85 lines
[+20] [+] private boolean nextScanner(int nbRows, final boolean done)
1137
        callable = getScannerCallable(localStartKey, nbRows);
1147
        callable = getScannerCallable(localStartKey, nbRows);
1138
        // Open a scanner on the region server starting at the
1148
        // Open a scanner on the region server starting at the
1139
        // beginning of the region
1149
        // beginning of the region
1140
        getConnection().getRegionServerWithRetries(callable);
1150
        getConnection().getRegionServerWithRetries(callable);
1141
        this.currentRegion = callable.getHRegionInfo();
1151
        this.currentRegion = callable.getHRegionInfo();

    
   
1152
        if (this.scanMetrics != null) {

    
   
1153
          this.scanMetrics.countOfRegions.inc();

    
   
1154
        }
1142
      } catch (IOException e) {
1155
      } catch (IOException e) {
1143
        close();
1156
        close();
1144
        throw e;
1157
        throw e;
1145
      }
1158
      }
1146
      return true;
1159
      return true;
1147
    }
1160
    }
1148

    
   
1161

   
1149
    protected ScannerCallable getScannerCallable(byte [] localStartKey,
1162
    protected ScannerCallable getScannerCallable(byte [] localStartKey,
1150
        int nbRows) {
1163
        int nbRows) {
1151
      scan.setStartRow(localStartKey);
1164
      scan.setStartRow(localStartKey);
1152
      ScannerCallable s = new ScannerCallable(getConnection(),
1165
      ScannerCallable s = new ScannerCallable(getConnection(),
1153
        getTableName(), scan);
1166
        getTableName(), scan, this.scanMetrics);
1154
      s.setCaching(nbRows);
1167
      s.setCaching(nbRows);
1155
      return s;
1168
      return s;
1156
    }
1169
    }
1157

    
   
1170

   

    
   
1171
    /**

    
   
1172
     * publish the scan metrics

    
   
1173
     * For now, we use scan.setAttribute to pass the metrics for application

    
   
1174
     * or TableInputFormat to consume

    
   
1175
     * Later, we could push it to other systems

    
   
1176
     * We don't use metrics framework because it doesn't support

    
   
1177
     * multi instances of the same metrics on the same machine; for scan/map

    
   
1178
     * reduce scenarios, we will have multiple scans running at the same time

    
   
1179
     */

    
   
1180
    private void writeScanMetrics() throws IOException

    
   
1181
    {

    
   
1182
      // by default, scanMetrics is null

    
   
1183
      // if application wants to collect scanMetrics, it can turn it on by

    
   
1184
      // calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE,

    
   
1185
      // Bytes.toBytes(Boolean.TRUE))

    
   
1186
      if (this.scanMetrics == null) {

    
   
1187
        return;

    
   
1188
      }

    
   
1189
      final DataOutputBuffer d = new DataOutputBuffer();

    
   
1190
      scanMetrics.write(d);

    
   
1191
      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());

    
   
1192
    }

    
   
1193

   
1158
    public Result next() throws IOException {
1194
    public Result next() throws IOException {
1159
      // If the scanner is closed but there is some rows left in the cache,
1195
      // If the scanner is closed but there is some rows left in the cache,
1160
      // it will first empty it before returning null
1196
      // it will first empty it before returning null
1161
      if (cache.size() == 0 && this.closed) {
1197
      if (cache.size() == 0 && this.closed) {

    
   
1198
        writeScanMetrics();
1162
        return null;
1199
        return null;
1163
      }
1200
      }
1164
      if (cache.size() == 0) {
1201
      if (cache.size() == 0) {
1165
        Result [] values = null;
1202
        Result [] values = null;
1166
        long remainingResultSize = maxScannerResultSize;
1203
        long remainingResultSize = maxScannerResultSize;
[+20] [20] 48 lines
[+20] public Result next() throws IOException {
1215
            }
1252
            }
1216
            // Clear region
1253
            // Clear region
1217
            this.currentRegion = null;
1254
            this.currentRegion = null;
1218
            continue;
1255
            continue;
1219
          }
1256
          }
1220
          lastNext = System.currentTimeMillis();
1257
          long currentTime = System.currentTimeMillis();

    
   
1258
          if (this.scanMetrics != null ) {

    
   
1259
            this.scanMetrics.sumOfMillisSecBetweenNexts.inc(

    
   
1260
              currentTime-lastNext);

    
   
1261
          }

    
   
1262
          lastNext = currentTime;
1221
          if (values != null && values.length > 0) {
1263
          if (values != null && values.length > 0) {
1222
            for (Result rs : values) {
1264
            for (Result rs : values) {
1223
              cache.add(rs);
1265
              cache.add(rs);
1224
              for (KeyValue kv : rs.raw()) {
1266
              for (KeyValue kv : rs.raw()) {
1225
                  remainingResultSize -= kv.heapSize();
1267
                  remainingResultSize -= kv.heapSize();
[+20] [20] 7 lines
[+20] public Result next() throws IOException {
1233
      }
1275
      }
1234

    
   
1276

   
1235
      if (cache.size() > 0) {
1277
      if (cache.size() > 0) {
1236
        return cache.poll();
1278
        return cache.poll();
1237
      }
1279
      }

    
   
1280
      writeScanMetrics();
1238
      return null;
1281
      return null;
1239
    }
1282
    }
1240

    
   
1283

   
1241
    /**
1284
    /**
1242
     * Get <param>nbRows</param> rows.
1285
     * Get <param>nbRows</param> rows.
[+20] [20] 294 lines
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
New File
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
Revision 1176942 New Change
 
  1. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java: Loading...
  2. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java: Loading...
  3. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java: Loading...
  4. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java: Loading...
  5. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java: Loading...
  6. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java: Loading...
  7. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java: Loading...
  8. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java: Loading...
  9. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java: Loading...
  10. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java: Loading...