Review Board 1.7.22


HBASE-2214 per scan max buffersize

Review Request #4726 - Created April 15, 2012 and updated

ferdy
0.94
HBASE-2214
Reviewers
hbase
tedyu
hbase
HBASE-2214 per scan max buffersize.
It works when running this test:



    new HBaseTestingUtility(conf).startMiniCluster();
 
    HBaseAdmin admin = new HBaseAdmin(conf);
    if (!admin.tableExists("test")) {
      HTableDescriptor tableDesc = new HTableDescriptor("test");
      tableDesc.addFamily(new HColumnDescriptor("fam"));
      admin.createTable(tableDesc);
    }
    
    
    HTable table = new HTable(conf, "test");
    Put put; 
    
    put = new Put(Bytes.toBytes("row1"));
    put.add(Bytes.toBytes("fam"),Bytes.toBytes("qual1"),Bytes.toBytes("val1"));
    table.put(put);
    
    put = new Put(Bytes.toBytes("row2"));
    put.add(Bytes.toBytes("fam"),Bytes.toBytes("qual2"),Bytes.toBytes("val2"));
    table.put(put);
    
    put = new Put(Bytes.toBytes("row3"));
    put.add(Bytes.toBytes("fam"),Bytes.toBytes("qual3"),Bytes.toBytes("val3"));
    table.put(put);
    
    table.flushCommits();
    //put a logging statement to ClientScanner#next() to see the effect.
    {
      System.out.println("returns all rows at once because of the caching");
      Scan scan = new Scan();
      scan.setCaching(100);
      ResultScanner scanner = table.getScanner(scan);
      scanner.next(100);
    }
    {
      System.out.println("returns one row at a time because of the maxResultSize");
      Scan scan = new Scan();
      scan.setCaching(100);
      scan.setMaxResultSize(1);
      ResultScanner scanner = table.getScanner(scan);
      scanner.next(100);
    }



See output:

returns all rows at once because of the caching
2012-04-25 22:18:47,494 DEBUG [main] client.ClientScanner(94): Creating scanner over test starting at key ''
2012-04-25 22:18:47,494 DEBUG [main] client.ClientScanner(206): Advancing internal scanner to startKey at ''
2012-04-25 22:18:47,499 DEBUG [main] client.ClientScanner(323): Rows returned 3
2012-04-25 22:18:47,502 DEBUG [main] client.ClientScanner(193): Finished with scanning at {NAME => 'test,,1335385126388.ed23a82f3d6ca2eab571918843796259.', STARTKEY => '', ENDKEY => '', ENCODED => ed23a82f3d6ca2eab571918843796259,}
returns one row at a time because of the maxResultSize
2012-04-25 22:18:47,504 DEBUG [main] client.ClientScanner(94): Creating scanner over test starting at key ''
2012-04-25 22:18:47,505 DEBUG [main] client.ClientScanner(206): Advancing internal scanner to startKey at ''
2012-04-25 22:18:47,514 DEBUG [main] client.ClientScanner(323): Rows returned 1
2012-04-25 22:18:47,517 DEBUG [main] client.ClientScanner(323): Rows returned 1
2012-04-25 22:18:47,522 DEBUG [main] client.ClientScanner(323): Rows returned 1
/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
Revision 1330680 New Change
1
/**
1
/**
2
 * Licensed to the Apache Software Foundation (ASF) under one
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
8
 * with the License.  You may obtain a copy of the License at
9
 *
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18
package org.apache.hadoop.hbase.client;
18
package org.apache.hadoop.hbase.client;
19

    
   
19

   
20
import java.io.IOException;
20
import java.io.IOException;
21
import java.util.ArrayList;
21
import java.util.ArrayList;
22
import java.util.LinkedList;
22
import java.util.LinkedList;
23

    
   
23

   
24
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.Log;
25
import org.apache.commons.logging.LogFactory;
25
import org.apache.commons.logging.LogFactory;
26
import org.apache.hadoop.classification.InterfaceAudience;
26
import org.apache.hadoop.classification.InterfaceAudience;
27
import org.apache.hadoop.classification.InterfaceStability;
27
import org.apache.hadoop.classification.InterfaceStability;
28
import org.apache.hadoop.conf.Configuration;
28
import org.apache.hadoop.conf.Configuration;
29
import org.apache.hadoop.hbase.DoNotRetryIOException;
29
import org.apache.hadoop.hbase.DoNotRetryIOException;
30
import org.apache.hadoop.hbase.HConstants;
30
import org.apache.hadoop.hbase.HConstants;
31
import org.apache.hadoop.hbase.HRegionInfo;
31
import org.apache.hadoop.hbase.HRegionInfo;
32
import org.apache.hadoop.hbase.KeyValue;
32
import org.apache.hadoop.hbase.KeyValue;
33
import org.apache.hadoop.hbase.NotServingRegionException;
33
import org.apache.hadoop.hbase.NotServingRegionException;
34
import org.apache.hadoop.hbase.UnknownScannerException;
34
import org.apache.hadoop.hbase.UnknownScannerException;
35
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
36
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
36
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
37
import org.apache.hadoop.hbase.util.Bytes;
37
import org.apache.hadoop.hbase.util.Bytes;
38
import org.apache.hadoop.io.DataOutputBuffer;
38
import org.apache.hadoop.io.DataOutputBuffer;
39

    
   
39

   
40
/**
40
/**
41
 * Implements the scanner interface for the HBase client.
41
 * Implements the scanner interface for the HBase client.
42
 * If there are multiple regions in a table, this scanner will iterate
42
 * If there are multiple regions in a table, this scanner will iterate
43
 * through them all.
43
 * through them all.
44
 */
44
 */
45
@InterfaceAudience.Public
45
@InterfaceAudience.Public
46
@InterfaceStability.Stable
46
@InterfaceStability.Stable
47
public class ClientScanner extends AbstractClientScanner {
47
public class ClientScanner extends AbstractClientScanner {
48
    private final Log LOG = LogFactory.getLog(this.getClass());
48
    private final Log LOG = LogFactory.getLog(this.getClass());
49
    private Scan scan;
49
    private Scan scan;
50
    private boolean closed = false;
50
    private boolean closed = false;
51
    // Current region scanner is against.  Gets cleared if current region goes
51
    // Current region scanner is against.  Gets cleared if current region goes
52
    // wonky: e.g. if it splits on us.
52
    // wonky: e.g. if it splits on us.
53
    private HRegionInfo currentRegion = null;
53
    private HRegionInfo currentRegion = null;
54
    private ScannerCallable callable = null;
54
    private ScannerCallable callable = null;
55
    private final LinkedList<Result> cache = new LinkedList<Result>();
55
    private final LinkedList<Result> cache = new LinkedList<Result>();
56
    private final int caching;
56
    private final int caching;
57
    private long lastNext;
57
    private long lastNext;
58
    // Keep lastResult returned successfully in case we have to reset scanner.
58
    // Keep lastResult returned successfully in case we have to reset scanner.
59
    private Result lastResult = null;
59
    private Result lastResult = null;
60
    private ScanMetrics scanMetrics = null;
60
    private ScanMetrics scanMetrics = null;
61
    private final long maxScannerResultSize;
61
    private final long maxScannerResultSize;
62
    private final HConnection connection;
62
    private final HConnection connection;
63
    private final byte[] tableName;
63
    private final byte[] tableName;
64
    private final int scannerTimeout;
64
    private final int scannerTimeout;
65

    
   
65

   
66
    /**
66
    /**
67
     * Create a new ClientScanner for the specified table. An HConnection will be
67
     * Create a new ClientScanner for the specified table. An HConnection will be
68
     * retrieved using the passed Configuration.
68
     * retrieved using the passed Configuration.
69
     * Note that the passed {@link Scan}'s start row maybe changed changed. 
69
     * Note that the passed {@link Scan}'s start row maybe changed changed. 
70
     * 
70
     * 
71
     * @param conf The {@link Configuration} to use.
71
     * @param conf The {@link Configuration} to use.
72
     * @param scan {@link Scan} to use in this scanner
72
     * @param scan {@link Scan} to use in this scanner
73
     * @param tableName The table that we wish to scan
73
     * @param tableName The table that we wish to scan
74
     * @throws IOException
74
     * @throws IOException
75
     */
75
     */
76
    public ClientScanner(final Configuration conf, final Scan scan,
76
    public ClientScanner(final Configuration conf, final Scan scan,
77
        final byte[] tableName) throws IOException {
77
        final byte[] tableName) throws IOException {
78
      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
78
      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
79
    }
79
    }
80
 
80
 
81
    /**
81
    /**
82
     * Create a new ClientScanner for the specified table
82
     * Create a new ClientScanner for the specified table
83
     * Note that the passed {@link Scan}'s start row maybe changed changed. 
83
     * Note that the passed {@link Scan}'s start row maybe changed changed. 
84
     * 
84
     * 
85
     * @param conf The {@link Configuration} to use.
85
     * @param conf The {@link Configuration} to use.
86
     * @param scan {@link Scan} to use in this scanner
86
     * @param scan {@link Scan} to use in this scanner
87
     * @param tableName The table that we wish to scan
87
     * @param tableName The table that we wish to scan
88
     * @param connection Connection identifying the cluster
88
     * @param connection Connection identifying the cluster
89
     * @throws IOException
89
     * @throws IOException
90
     */
90
     */
91
    public ClientScanner(final Configuration conf, final Scan scan,
91
    public ClientScanner(final Configuration conf, final Scan scan,
92
      final byte[] tableName, HConnection connection) throws IOException {
92
      final byte[] tableName, HConnection connection) throws IOException {
93
      if (LOG.isDebugEnabled()) {
93
      if (LOG.isDebugEnabled()) {
94
        LOG.debug("Creating scanner over "
94
        LOG.debug("Creating scanner over "
95
            + Bytes.toString(tableName)
95
            + Bytes.toString(tableName)
96
            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
96
            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
97
      }
97
      }
98
      this.scan = scan;
98
      this.scan = scan;
99
      this.tableName = tableName;
99
      this.tableName = tableName;
100
      this.lastNext = System.currentTimeMillis();
100
      this.lastNext = System.currentTimeMillis();
101
      this.connection = connection;
101
      this.connection = connection;

    
   
102
      if (scan.getMaxResultSize() > 0) {

    
   
103
        this.maxScannerResultSize = scan.getMaxResultSize();

    
   
104
      } else {
102
      this.maxScannerResultSize = conf.getLong(
105
        this.maxScannerResultSize = conf.getLong(
103
          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
106
          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
104
          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
107
          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);

    
   
108
      }
105
      this.scannerTimeout = (int) conf.getLong(
109
      this.scannerTimeout = (int) conf.getLong(
106
          HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
110
          HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
107
          HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
111
          HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
108

    
   
112

   
109
      // check if application wants to collect scan metrics
113
      // check if application wants to collect scan metrics
110
      byte[] enableMetrics = scan.getAttribute(
114
      byte[] enableMetrics = scan.getAttribute(
111
        Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
115
        Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
112
      if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
116
      if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
113
        scanMetrics = new ScanMetrics();
117
        scanMetrics = new ScanMetrics();
114
      }
118
      }
115

    
   
119

   
116
      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
120
      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
117
      if (this.scan.getCaching() > 0) {
121
      if (this.scan.getCaching() > 0) {
118
        this.caching = this.scan.getCaching();
122
        this.caching = this.scan.getCaching();
119
      } else {
123
      } else {
120
        this.caching = conf.getInt("hbase.client.scanner.caching", 1);
124
        this.caching = conf.getInt("hbase.client.scanner.caching", 1);
121
      }
125
      }
122

    
   
126

   
123
      // initialize the scanner
127
      // initialize the scanner
124
      nextScanner(this.caching, false);
128
      nextScanner(this.caching, false);
125
    }
129
    }
126

    
   
130

   
127
    protected HConnection getConnection() {
131
    protected HConnection getConnection() {
128
      return this.connection;
132
      return this.connection;
129
    }
133
    }
130

    
   
134

   
131
    protected byte[] getTableName() {
135
    protected byte[] getTableName() {
132
      return this.tableName;
136
      return this.tableName;
133
    }
137
    }
134

    
   
138

   
135
    protected Scan getScan() {
139
    protected Scan getScan() {
136
      return scan;
140
      return scan;
137
    }
141
    }
138

    
   
142

   
139
    protected long getTimestamp() {
143
    protected long getTimestamp() {
140
      return lastNext;
144
      return lastNext;
141
    }
145
    }
142

    
   
146

   
143
    // returns true if the passed region endKey
147
    // returns true if the passed region endKey
144
    private boolean checkScanStopRow(final byte [] endKey) {
148
    private boolean checkScanStopRow(final byte [] endKey) {
145
      if (this.scan.getStopRow().length > 0) {
149
      if (this.scan.getStopRow().length > 0) {
146
        // there is a stop row, check to see if we are past it.
150
        // there is a stop row, check to see if we are past it.
147
        byte [] stopRow = scan.getStopRow();
151
        byte [] stopRow = scan.getStopRow();
148
        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
152
        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
149
          endKey, 0, endKey.length);
153
          endKey, 0, endKey.length);
150
        if (cmp <= 0) {
154
        if (cmp <= 0) {
151
          // stopRow <= endKey (endKey is equals to or larger than stopRow)
155
          // stopRow <= endKey (endKey is equals to or larger than stopRow)
152
          // This is a stop.
156
          // This is a stop.
153
          return true;
157
          return true;
154
        }
158
        }
155
      }
159
      }
156
      return false; //unlikely.
160
      return false; //unlikely.
157
    }
161
    }
158

    
   
162

   
159
    /*
163
    /*
160
     * Gets a scanner for the next region.  If this.currentRegion != null, then
164
     * Gets a scanner for the next region.  If this.currentRegion != null, then
161
     * we will move to the endrow of this.currentRegion.  Else we will get
165
     * we will move to the endrow of this.currentRegion.  Else we will get
162
     * scanner at the scan.getStartRow().  We will go no further, just tidy
166
     * scanner at the scan.getStartRow().  We will go no further, just tidy
163
     * up outstanding scanners, if <code>currentRegion != null</code> and
167
     * up outstanding scanners, if <code>currentRegion != null</code> and
164
     * <code>done</code> is true.
168
     * <code>done</code> is true.
165
     * @param nbRows
169
     * @param nbRows
166
     * @param done Server-side says we're done scanning.
170
     * @param done Server-side says we're done scanning.
167
     */
171
     */
168
    private boolean nextScanner(int nbRows, final boolean done)
172
    private boolean nextScanner(int nbRows, final boolean done)
169
    throws IOException {
173
    throws IOException {
170
      // Close the previous scanner if it's open
174
      // Close the previous scanner if it's open
171
      if (this.callable != null) {
175
      if (this.callable != null) {
172
        this.callable.setClose();
176
        this.callable.setClose();
173
        callable.withRetries();
177
        callable.withRetries();
174
        this.callable = null;
178
        this.callable = null;
175
      }
179
      }
176

    
   
180

   
177
      // Where to start the next scanner
181
      // Where to start the next scanner
178
      byte [] localStartKey;
182
      byte [] localStartKey;
179

    
   
183

   
180
      // if we're at end of table, close and return false to stop iterating
184
      // if we're at end of table, close and return false to stop iterating
181
      if (this.currentRegion != null) {
185
      if (this.currentRegion != null) {
182
        byte [] endKey = this.currentRegion.getEndKey();
186
        byte [] endKey = this.currentRegion.getEndKey();
183
        if (endKey == null ||
187
        if (endKey == null ||
184
            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
188
            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
185
            checkScanStopRow(endKey) ||
189
            checkScanStopRow(endKey) ||
186
            done) {
190
            done) {
187
          close();
191
          close();
188
          if (LOG.isDebugEnabled()) {
192
          if (LOG.isDebugEnabled()) {
189
            LOG.debug("Finished with scanning at " + this.currentRegion);
193
            LOG.debug("Finished with scanning at " + this.currentRegion);
190
          }
194
          }
191
          return false;
195
          return false;
192
        }
196
        }
193
        localStartKey = endKey;
197
        localStartKey = endKey;
194
        if (LOG.isDebugEnabled()) {
198
        if (LOG.isDebugEnabled()) {
195
          LOG.debug("Finished with region " + this.currentRegion);
199
          LOG.debug("Finished with region " + this.currentRegion);
196
        }
200
        }
197
      } else {
201
      } else {
198
        localStartKey = this.scan.getStartRow();
202
        localStartKey = this.scan.getStartRow();
199
      }
203
      }
200

    
   
204

   
201
      if (LOG.isDebugEnabled()) {
205
      if (LOG.isDebugEnabled()) {
202
        LOG.debug("Advancing internal scanner to startKey at '" +
206
        LOG.debug("Advancing internal scanner to startKey at '" +
203
          Bytes.toStringBinary(localStartKey) + "'");
207
          Bytes.toStringBinary(localStartKey) + "'");
204
      }
208
      }
205
      try {
209
      try {
206
        callable = getScannerCallable(localStartKey, nbRows);
210
        callable = getScannerCallable(localStartKey, nbRows);
207
        // Open a scanner on the region server starting at the
211
        // Open a scanner on the region server starting at the
208
        // beginning of the region
212
        // beginning of the region
209
        callable.withRetries();
213
        callable.withRetries();
210
        this.currentRegion = callable.getHRegionInfo();
214
        this.currentRegion = callable.getHRegionInfo();
211
        if (this.scanMetrics != null) {
215
        if (this.scanMetrics != null) {
212
          this.scanMetrics.countOfRegions.inc();
216
          this.scanMetrics.countOfRegions.inc();
213
        }
217
        }
214
      } catch (IOException e) {
218
      } catch (IOException e) {
215
        close();
219
        close();
216
        throw e;
220
        throw e;
217
      }
221
      }
218
      return true;
222
      return true;
219
    }
223
    }
220

    
   
224

   
221
    protected ScannerCallable getScannerCallable(byte [] localStartKey,
225
    protected ScannerCallable getScannerCallable(byte [] localStartKey,
222
        int nbRows) {
226
        int nbRows) {
223
      scan.setStartRow(localStartKey);
227
      scan.setStartRow(localStartKey);
224
      ScannerCallable s = new ScannerCallable(getConnection(),
228
      ScannerCallable s = new ScannerCallable(getConnection(),
225
        getTableName(), scan, this.scanMetrics);
229
        getTableName(), scan, this.scanMetrics);
226
      s.setCaching(nbRows);
230
      s.setCaching(nbRows);
227
      return s;
231
      return s;
228
    }
232
    }
229

    
   
233

   
230
    /**
234
    /**
231
     * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
235
     * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
232
     * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
236
     * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
233
     * framework because it doesn't support multi-instances of the same metrics on the same machine;
237
     * framework because it doesn't support multi-instances of the same metrics on the same machine;
234
     * for scan/map reduce scenarios, we will have multiple scans running at the same time.
238
     * for scan/map reduce scenarios, we will have multiple scans running at the same time.
235
     *
239
     *
236
     * By default, scan metrics are disabled; if the application wants to collect them, this behavior
240
     * By default, scan metrics are disabled; if the application wants to collect them, this behavior
237
     * can be turned on by calling calling:
241
     * can be turned on by calling calling:
238
     *
242
     *
239
     * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
243
     * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
240
     */
244
     */
241
    private void writeScanMetrics() throws IOException {
245
    private void writeScanMetrics() throws IOException {
242
      if (this.scanMetrics == null) {
246
      if (this.scanMetrics == null) {
243
        return;
247
        return;
244
      }
248
      }
245
      final DataOutputBuffer d = new DataOutputBuffer();
249
      final DataOutputBuffer d = new DataOutputBuffer();
246
      scanMetrics.write(d);
250
      scanMetrics.write(d);
247
      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
251
      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
248
    }
252
    }
249

    
   
253

   
250
    public Result next() throws IOException {
254
    public Result next() throws IOException {
251
      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
255
      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
252
      if (cache.size() == 0 && this.closed) {
256
      if (cache.size() == 0 && this.closed) {
253
        return null;
257
        return null;
254
      }
258
      }
255
      if (cache.size() == 0) {
259
      if (cache.size() == 0) {
256
        Result [] values = null;
260
        Result [] values = null;
257
        long remainingResultSize = maxScannerResultSize;
261
        long remainingResultSize = maxScannerResultSize;
258
        int countdown = this.caching;
262
        int countdown = this.caching;
259
        // We need to reset it if it's a new callable that was created
263
        // We need to reset it if it's a new callable that was created
260
        // with a countdown in nextScanner
264
        // with a countdown in nextScanner
261
        callable.setCaching(this.caching);
265
        callable.setCaching(this.caching);
262
        // This flag is set when we want to skip the result returned.  We do
266
        // This flag is set when we want to skip the result returned.  We do
263
        // this when we reset scanner because it split under us.
267
        // this when we reset scanner because it split under us.
264
        boolean skipFirst = false;
268
        boolean skipFirst = false;
265
        do {
269
        do {
266
          try {
270
          try {
267
            if (skipFirst) {
271
            if (skipFirst) {
268
              // Skip only the first row (which was the last row of the last
272
              // Skip only the first row (which was the last row of the last
269
              // already-processed batch).
273
              // already-processed batch).
270
              callable.setCaching(1);
274
              callable.setCaching(1);
271
              values = callable.withRetries();
275
              values = callable.withRetries();
272
              callable.setCaching(this.caching);
276
              callable.setCaching(this.caching);
273
              skipFirst = false;
277
              skipFirst = false;
274
            }
278
            }
275
            // Server returns a null values if scanning is to stop.  Else,
279
            // Server returns a null values if scanning is to stop.  Else,
276
            // returns an empty array if scanning is to go on and we've just
280
            // returns an empty array if scanning is to go on and we've just
277
            // exhausted current region.
281
            // exhausted current region.
278
            values = callable.withRetries();
282
            values = callable.withRetries();
279
          } catch (DoNotRetryIOException e) {
283
          } catch (DoNotRetryIOException e) {
280
            if (e instanceof UnknownScannerException) {
284
            if (e instanceof UnknownScannerException) {
281
              long timeout = lastNext + scannerTimeout;
285
              long timeout = lastNext + scannerTimeout;
282
              // If we are over the timeout, throw this exception to the client
286
              // If we are over the timeout, throw this exception to the client
283
              // Else, it's because the region moved and we used the old id
287
              // Else, it's because the region moved and we used the old id
284
              // against the new region server; reset the scanner.
288
              // against the new region server; reset the scanner.
285
              if (timeout < System.currentTimeMillis()) {
289
              if (timeout < System.currentTimeMillis()) {
286
                long elapsed = System.currentTimeMillis() - lastNext;
290
                long elapsed = System.currentTimeMillis() - lastNext;
287
                ScannerTimeoutException ex = new ScannerTimeoutException(
291
                ScannerTimeoutException ex = new ScannerTimeoutException(
288
                    elapsed + "ms passed since the last invocation, " +
292
                    elapsed + "ms passed since the last invocation, " +
289
                        "timeout is currently set to " + scannerTimeout);
293
                        "timeout is currently set to " + scannerTimeout);
290
                ex.initCause(e);
294
                ex.initCause(e);
291
                throw ex;
295
                throw ex;
292
              }
296
              }
293
            } else {
297
            } else {
294
              Throwable cause = e.getCause();
298
              Throwable cause = e.getCause();
295
              if (cause == null || (!(cause instanceof NotServingRegionException)
299
              if (cause == null || (!(cause instanceof NotServingRegionException)
296
                  && !(cause instanceof RegionServerStoppedException))) {
300
                  && !(cause instanceof RegionServerStoppedException))) {
297
                throw e;
301
                throw e;
298
              }
302
              }
299
            }
303
            }
300
            // Else, its signal from depths of ScannerCallable that we got an
304
            // Else, its signal from depths of ScannerCallable that we got an
301
            // NSRE on a next and that we need to reset the scanner.
305
            // NSRE on a next and that we need to reset the scanner.
302
            if (this.lastResult != null) {
306
            if (this.lastResult != null) {
303
              this.scan.setStartRow(this.lastResult.getRow());
307
              this.scan.setStartRow(this.lastResult.getRow());
304
              // Skip first row returned.  We already let it out on previous
308
              // Skip first row returned.  We already let it out on previous
305
              // invocation.
309
              // invocation.
306
              skipFirst = true;
310
              skipFirst = true;
307
            }
311
            }
308
            // Clear region
312
            // Clear region
309
            this.currentRegion = null;
313
            this.currentRegion = null;
310
            callable = null;
314
            callable = null;
311
            continue;
315
            continue;
312
          }
316
          }
313
          long currentTime = System.currentTimeMillis();
317
          long currentTime = System.currentTimeMillis();
314
          if (this.scanMetrics != null ) {
318
          if (this.scanMetrics != null ) {
315
            this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime-lastNext);
319
            this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime-lastNext);
316
          }
320
          }
317
          lastNext = currentTime;
321
          lastNext = currentTime;
318
          if (values != null && values.length > 0) {
322
          if (values != null && values.length > 0) {

    
   
323
            if (LOG.isDebugEnabled()) {LOG.debug("Rows returned " + values.length);}
319
            for (Result rs : values) {
324
            for (Result rs : values) {
320
              cache.add(rs);
325
              cache.add(rs);
321
              for (KeyValue kv : rs.raw()) {
326
              for (KeyValue kv : rs.raw()) {
322
                  remainingResultSize -= kv.heapSize();
327
                  remainingResultSize -= kv.heapSize();
323
              }
328
              }
324
              countdown--;
329
              countdown--;
325
              this.lastResult = rs;
330
              this.lastResult = rs;
326
            }
331
            }
327
          }
332
          }
328
          // Values == null means server-side filter has determined we must STOP
333
          // Values == null means server-side filter has determined we must STOP
329
        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
334
        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
330
      }
335
      }
331

    
   
336

   
332
      if (cache.size() > 0) {
337
      if (cache.size() > 0) {
333
        return cache.poll();
338
        return cache.poll();
334
      }
339
      }
335

    
   
340

   
336
      // if we exhausted this scanner before calling close, write out the scan metrics
341
      // if we exhausted this scanner before calling close, write out the scan metrics
337
      writeScanMetrics();
342
      writeScanMetrics();
338
      return null;
343
      return null;
339
    }
344
    }
340

    
   
345

   
341
    /**
346
    /**
342
     * Get <param>nbRows</param> rows.
347
     * Get <param>nbRows</param> rows.
343
     * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
348
     * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
344
     * setting (or hbase.client.scanner.caching in hbase-site.xml).
349
     * setting (or hbase.client.scanner.caching in hbase-site.xml).
345
     * @param nbRows number of rows to return
350
     * @param nbRows number of rows to return
346
     * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
351
     * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
347
     * if returned array is of zero-length (We never return null).
352
     * if returned array is of zero-length (We never return null).
348
     * @throws IOException
353
     * @throws IOException
349
     */
354
     */
350
    public Result [] next(int nbRows) throws IOException {
355
    public Result [] next(int nbRows) throws IOException {
351
      // Collect values to be returned here
356
      // Collect values to be returned here
352
      ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
357
      ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
353
      for(int i = 0; i < nbRows; i++) {
358
      for(int i = 0; i < nbRows; i++) {
354
        Result next = next();
359
        Result next = next();
355
        if (next != null) {
360
        if (next != null) {
356
          resultSets.add(next);
361
          resultSets.add(next);
357
        } else {
362
        } else {
358
          break;
363
          break;
359
        }
364
        }
360
      }
365
      }
361
      return resultSets.toArray(new Result[resultSets.size()]);
366
      return resultSets.toArray(new Result[resultSets.size()]);
362
    }
367
    }
363

    
   
368

   
364
    public void close() {
369
    public void close() {
365
      if (callable != null) {
370
      if (callable != null) {
366
        callable.setClose();
371
        callable.setClose();
367
        try {
372
        try {
368
          callable.withRetries();
373
          callable.withRetries();
369
        } catch (IOException e) {
374
        } catch (IOException e) {
370
          // We used to catch this error, interpret, and rethrow. However, we
375
          // We used to catch this error, interpret, and rethrow. However, we
371
          // have since decided that it's not nice for a scanner's close to
376
          // have since decided that it's not nice for a scanner's close to
372
          // throw exceptions. Chances are it was just an UnknownScanner
377
          // throw exceptions. Chances are it was just an UnknownScanner
373
          // exception due to lease time out.
378
          // exception due to lease time out.
374
        } finally {
379
        } finally {
375
          // we want to output the scan metrics even if an error occurred on close
380
          // we want to output the scan metrics even if an error occurred on close
376
          try {
381
          try {
377
            writeScanMetrics();
382
            writeScanMetrics();
378
          } catch (IOException e) {
383
          } catch (IOException e) {
379
            // As above, we still don't want the scanner close() method to throw.
384
            // As above, we still don't want the scanner close() method to throw.
380
          }
385
          }
381
        }
386
        }
382
        callable = null;
387
        callable = null;
383
      }
388
      }
384
      closed = true;
389
      closed = true;
385
    }
390
    }
386
}
391
}
/src/main/java/org/apache/hadoop/hbase/client/Scan.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
Revision 1330680 New Change
 
/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java
Revision 1330680 New Change
 
/src/main/protobuf/Client.proto
Revision 1330680 New Change
 
/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
Revision 1330680 New Change
 
  1. /src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java: Loading...
  2. /src/main/java/org/apache/hadoop/hbase/client/Scan.java: Loading...
  3. /src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java: Loading...
  4. /src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java: Loading...
  5. /src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java: Loading...
  6. /src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java: Loading...
  7. /src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java: Loading...
  8. /src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java: Loading...
  9. /src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java: Loading...
  10. /src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java: Loading...
  11. /src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java: Loading...
  12. /src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java: Loading...
  13. /src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java: Loading...
  14. /src/main/protobuf/Client.proto: Loading...
  15. /src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java: Loading...