Review Board 1.7.22


HBASE-4145 Provide metrics for hbase client

Review Request #1674 - Created Aug. 29, 2011 and updated

Ming Ma
trunk
hbase-4145
Reviewers
hbase
hbase
1. Collect client-side scan related metrics during scan operation. It is turned off by default.
2. TableInputFormat enables metrics collection on scan and pass the data to mapreduce framework. It only works with new mapreduce APIs that allow TableInputFormat to get access to mapreduce Counter.
3. Clean up some minor issues in tableInputFormat as well as test code.
1. Verified on a small cluster.
2. Existing unit tests.
3. Added new tests.
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Revision 1176942 New Change
1
/**
1
/**
2
 * Copyright 2010 The Apache Software Foundation
2
 * Copyright 2011 The Apache Software Foundation
3
 *
3
 *
4
 * Licensed to the Apache Software Foundation (ASF) under one
4
 * Licensed to the Apache Software Foundation (ASF) under one
5
 * or more contributor license agreements.  See the NOTICE file
5
 * or more contributor license agreements.  See the NOTICE file
6
 * distributed with this work for additional information
6
 * distributed with this work for additional information
7
 * regarding copyright ownership.  The ASF licenses this file
7
 * regarding copyright ownership.  The ASF licenses this file
8
 * to you under the Apache License, Version 2.0 (the
8
 * to you under the Apache License, Version 2.0 (the
9
 * "License"); you may not use this file except in compliance
9
 * "License"); you may not use this file except in compliance
10
 * with the License.  You may obtain a copy of the License at
10
 * with the License.  You may obtain a copy of the License at
11
 *
11
 *
12
 *     http://www.apache.org/licenses/LICENSE-2.0
12
 *     http://www.apache.org/licenses/LICENSE-2.0
13
 *
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
18
 * limitations under the License.
19
 */
19
 */
20
package org.apache.hadoop.hbase.client;
20
package org.apache.hadoop.hbase.client;
21

    
   
21

   
22
import java.io.Closeable;
22
import java.io.Closeable;
23
import java.io.DataInput;
23
import java.io.DataInput;
24
import java.io.DataOutput;
24
import java.io.DataOutput;
25
import java.io.IOException;
25
import java.io.IOException;
26
import java.lang.reflect.Proxy;
26
import java.lang.reflect.Proxy;
27
import java.util.ArrayList;
27
import java.util.ArrayList;
28
import java.util.Arrays;
28
import java.util.Arrays;
29
import java.util.Iterator;
29
import java.util.Iterator;
30
import java.util.LinkedList;
30
import java.util.LinkedList;
31
import java.util.List;
31
import java.util.List;
32
import java.util.Map;
32
import java.util.Map;
33
import java.util.NavigableMap;
33
import java.util.NavigableMap;
34
import java.util.TreeMap;
34
import java.util.TreeMap;
35
import java.util.concurrent.ExecutorService;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.SynchronousQueue;
36
import java.util.concurrent.SynchronousQueue;
37
import java.util.concurrent.ThreadFactory;
37
import java.util.concurrent.ThreadFactory;
38
import java.util.concurrent.ThreadPoolExecutor;
38
import java.util.concurrent.ThreadPoolExecutor;
39
import java.util.concurrent.TimeUnit;
39
import java.util.concurrent.TimeUnit;
40
import java.util.concurrent.atomic.AtomicInteger;
40
import java.util.concurrent.atomic.AtomicInteger;
41

    
   
41

   
42
import org.apache.commons.logging.Log;
42
import org.apache.commons.logging.Log;
43
import org.apache.commons.logging.LogFactory;
43
import org.apache.commons.logging.LogFactory;
44
import org.apache.hadoop.conf.Configuration;
44
import org.apache.hadoop.conf.Configuration;
45
import org.apache.hadoop.hbase.DoNotRetryIOException;
45
import org.apache.hadoop.hbase.DoNotRetryIOException;
46
import org.apache.hadoop.hbase.HBaseConfiguration;
46
import org.apache.hadoop.hbase.HBaseConfiguration;
47
import org.apache.hadoop.hbase.HConstants;
47
import org.apache.hadoop.hbase.HConstants;
48
import org.apache.hadoop.hbase.HRegionInfo;
48
import org.apache.hadoop.hbase.HRegionInfo;
49
import org.apache.hadoop.hbase.HRegionLocation;
49
import org.apache.hadoop.hbase.HRegionLocation;
50
import org.apache.hadoop.hbase.HServerAddress;
50
import org.apache.hadoop.hbase.HServerAddress;
51
import org.apache.hadoop.hbase.HTableDescriptor;
51
import org.apache.hadoop.hbase.HTableDescriptor;
52
import org.apache.hadoop.hbase.KeyValue;
52
import org.apache.hadoop.hbase.KeyValue;
53
import org.apache.hadoop.hbase.NotServingRegionException;
53
import org.apache.hadoop.hbase.NotServingRegionException;
54
import org.apache.hadoop.hbase.ServerName;
54
import org.apache.hadoop.hbase.ServerName;
55
import org.apache.hadoop.hbase.UnknownScannerException;
55
import org.apache.hadoop.hbase.UnknownScannerException;
56
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
56
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
57
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
57
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
58
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
58
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;

    
   
59
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
59
import org.apache.hadoop.hbase.client.coprocessor.Batch;
60
import org.apache.hadoop.hbase.client.coprocessor.Batch;
60
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
61
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
61
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
62
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
62
import org.apache.hadoop.hbase.util.Addressing;
63
import org.apache.hadoop.hbase.util.Addressing;
63
import org.apache.hadoop.hbase.util.Bytes;
64
import org.apache.hadoop.hbase.util.Bytes;
64
import org.apache.hadoop.hbase.util.Pair;
65
import org.apache.hadoop.hbase.util.Pair;
65
import org.apache.hadoop.hbase.util.Writables;
66
import org.apache.hadoop.hbase.util.Writables;

    
   
67
import org.apache.hadoop.io.DataOutputBuffer;
66

    
   
68

   
67
/**
69
/**
68
 * <p>Used to communicate with a single HBase table.
70
 * <p>Used to communicate with a single HBase table.
69
 *
71
 *
70
 * <p>This class is not thread safe for reads nor write.
72
 * <p>This class is not thread safe for reads nor write.
71
 * 
73
 * 
72
 * <p>In case of writes (Put, Delete), the underlying write buffer can
74
 * <p>In case of writes (Put, Delete), the underlying write buffer can
73
 * be corrupted if multiple threads contend over a single HTable instance.
75
 * be corrupted if multiple threads contend over a single HTable instance.
74
 * 
76
 * 
75
 * <p>In case of reads, some fields used by a Scan are shared among all threads.
77
 * <p>In case of reads, some fields used by a Scan are shared among all threads.
76
 * The HTable implementation can either not contract to be safe in case of a Get
78
 * The HTable implementation can either not contract to be safe in case of a Get
77
 *
79
 *
78
 * <p>To access a table in a multi threaded environment, please consider
80
 * <p>To access a table in a multi threaded environment, please consider
79
 * using the {@link HTablePool} class to create your HTable instances.
81
 * using the {@link HTablePool} class to create your HTable instances.
80
 *
82
 *
81
 * <p>Instances of HTable passed the same {@link Configuration} instance will
83
 * <p>Instances of HTable passed the same {@link Configuration} instance will
82
 * share connections to servers out on the cluster and to the zookeeper ensemble
84
 * share connections to servers out on the cluster and to the zookeeper ensemble
83
 * as well as caches of region locations.  This is usually a *good* thing and it
85
 * as well as caches of region locations.  This is usually a *good* thing and it
84
 * is recommended to reuse the same configuration object for all your tables.
86
 * is recommended to reuse the same configuration object for all your tables.
85
 * This happens because they will all share the same underlying
87
 * This happens because they will all share the same underlying
86
 * {@link HConnection} instance. See {@link HConnectionManager} for more on
88
 * {@link HConnection} instance. See {@link HConnectionManager} for more on
87
 * how this mechanism works.
89
 * how this mechanism works.
88
 *
90
 *
89
 * <p>{@link HConnection} will read most of the
91
 * <p>{@link HConnection} will read most of the
90
 * configuration it needs from the passed {@link Configuration} on initial
92
 * configuration it needs from the passed {@link Configuration} on initial
91
 * construction.  Thereafter, for settings such as
93
 * construction.  Thereafter, for settings such as
92
 * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
94
 * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
93
 * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
95
 * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
94
 * passed {@link Configuration} subsequent to {@link HConnection} construction
96
 * passed {@link Configuration} subsequent to {@link HConnection} construction
95
 * will go unnoticed.  To run with changed values, make a new
97
 * will go unnoticed.  To run with changed values, make a new
96
 * {@link HTable} passing a new {@link Configuration} instance that has the
98
 * {@link HTable} passing a new {@link Configuration} instance that has the
97
 * new configuration.
99
 * new configuration.
98
 *
100
 *
99
 * <p>Note that this class implements the {@link Closeable} interface. When a
101
 * <p>Note that this class implements the {@link Closeable} interface. When a
100
 * HTable instance is no longer required, it *should* be closed in order to ensure
102
 * HTable instance is no longer required, it *should* be closed in order to ensure
101
 * that the underlying resources are promptly released. Please note that the close 
103
 * that the underlying resources are promptly released. Please note that the close 
102
 * method can throw java.io.IOException that must be handled.
104
 * method can throw java.io.IOException that must be handled.
103
 *
105
 *
104
 * @see HBaseAdmin for create, drop, list, enable and disable of tables.
106
 * @see HBaseAdmin for create, drop, list, enable and disable of tables.
105
 * @see HConnection
107
 * @see HConnection
106
 * @see HConnectionManager
108
 * @see HConnectionManager
107
 */
109
 */
108
public class HTable implements HTableInterface, Closeable {
110
public class HTable implements HTableInterface, Closeable {
109
  private static final Log LOG = LogFactory.getLog(HTable.class);
111
  private static final Log LOG = LogFactory.getLog(HTable.class);
110
  private HConnection connection;
112
  private HConnection connection;
111
  private final byte [] tableName;
113
  private final byte [] tableName;
112
  protected final int scannerTimeout;
114
  protected final int scannerTimeout;
113
  private volatile Configuration configuration;
115
  private volatile Configuration configuration;
114
  private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
116
  private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
115
  private long writeBufferSize;
117
  private long writeBufferSize;
116
  private boolean clearBufferOnFail;
118
  private boolean clearBufferOnFail;
117
  private boolean autoFlush;
119
  private boolean autoFlush;
118
  private long currentWriteBufferSize;
120
  private long currentWriteBufferSize;
119
  protected int scannerCaching;
121
  protected int scannerCaching;
120
  private int maxKeyValueSize;
122
  private int maxKeyValueSize;
121
  private ExecutorService pool;  // For Multi
123
  private ExecutorService pool;  // For Multi
122
  private long maxScannerResultSize;
124
  private long maxScannerResultSize;
123
  private boolean closed;
125
  private boolean closed;
124
  private int operationTimeout;
126
  private int operationTimeout;
125
  private static final int DOPUT_WB_CHECK = 10;    // i.e., doPut checks the writebuffer every X Puts.
127
  private static final int DOPUT_WB_CHECK = 10;    // i.e., doPut checks the writebuffer every X Puts.
126
  
128
  
127
  /**
129
  /**
128
   * Creates an object to access a HBase table.
130
   * Creates an object to access a HBase table.
129
   * Internally it creates a new instance of {@link Configuration} and a new
131
   * Internally it creates a new instance of {@link Configuration} and a new
130
   * client to zookeeper as well as other resources.  It also comes up with
132
   * client to zookeeper as well as other resources.  It also comes up with
131
   * a fresh view of the cluster and must do discovery from scratch of region
133
   * a fresh view of the cluster and must do discovery from scratch of region
132
   * locations; i.e. it will not make use of already-cached region locations if
134
   * locations; i.e. it will not make use of already-cached region locations if
133
   * available. Use only when being quick and dirty.
135
   * available. Use only when being quick and dirty.
134
   * @throws IOException if a remote or network exception occurs
136
   * @throws IOException if a remote or network exception occurs
135
   * @see #HTable(Configuration, String)
137
   * @see #HTable(Configuration, String)
136
   */
138
   */
137
  public HTable(final String tableName)
139
  public HTable(final String tableName)
138
  throws IOException {
140
  throws IOException {
139
    this(HBaseConfiguration.create(), Bytes.toBytes(tableName));
141
    this(HBaseConfiguration.create(), Bytes.toBytes(tableName));
140
  }
142
  }
141

    
   
143

   
142
  /**
144
  /**
143
   * Creates an object to access a HBase table.
145
   * Creates an object to access a HBase table.
144
   * Internally it creates a new instance of {@link Configuration} and a new
146
   * Internally it creates a new instance of {@link Configuration} and a new
145
   * client to zookeeper as well as other resources.  It also comes up with
147
   * client to zookeeper as well as other resources.  It also comes up with
146
   * a fresh view of the cluster and must do discovery from scratch of region
148
   * a fresh view of the cluster and must do discovery from scratch of region
147
   * locations; i.e. it will not make use of already-cached region locations if
149
   * locations; i.e. it will not make use of already-cached region locations if
148
   * available. Use only when being quick and dirty.
150
   * available. Use only when being quick and dirty.
149
   * @param tableName Name of the table.
151
   * @param tableName Name of the table.
150
   * @throws IOException if a remote or network exception occurs
152
   * @throws IOException if a remote or network exception occurs
151
   * @see #HTable(Configuration, String)
153
   * @see #HTable(Configuration, String)
152
   */
154
   */
153
  public HTable(final byte [] tableName)
155
  public HTable(final byte [] tableName)
154
  throws IOException {
156
  throws IOException {
155
    this(HBaseConfiguration.create(), tableName);
157
    this(HBaseConfiguration.create(), tableName);
156
  }
158
  }
157

    
   
159

   
158
  /**
160
  /**
159
   * Creates an object to access a HBase table.
161
   * Creates an object to access a HBase table.
160
   * Shares zookeeper connection and other resources with other HTable instances
162
   * Shares zookeeper connection and other resources with other HTable instances
161
   * created with the same <code>conf</code> instance.  Uses already-populated
163
   * created with the same <code>conf</code> instance.  Uses already-populated
162
   * region cache if one is available, populated by any other HTable instances
164
   * region cache if one is available, populated by any other HTable instances
163
   * sharing this <code>conf</code> instance.  Recommended.
165
   * sharing this <code>conf</code> instance.  Recommended.
164
   * @param conf Configuration object to use.
166
   * @param conf Configuration object to use.
165
   * @param tableName Name of the table.
167
   * @param tableName Name of the table.
166
   * @throws IOException if a remote or network exception occurs
168
   * @throws IOException if a remote or network exception occurs
167
   */
169
   */
168
  public HTable(Configuration conf, final String tableName)
170
  public HTable(Configuration conf, final String tableName)
169
  throws IOException {
171
  throws IOException {
170
    this(conf, Bytes.toBytes(tableName));
172
    this(conf, Bytes.toBytes(tableName));
171
  }
173
  }
172

    
   
174

   
173

    
   
175

   
174
  /**
176
  /**
175
   * Creates an object to access a HBase table.
177
   * Creates an object to access a HBase table.
176
   * Shares zookeeper connection and other resources with other HTable instances
178
   * Shares zookeeper connection and other resources with other HTable instances
177
   * created with the same <code>conf</code> instance.  Uses already-populated
179
   * created with the same <code>conf</code> instance.  Uses already-populated
178
   * region cache if one is available, populated by any other HTable instances
180
   * region cache if one is available, populated by any other HTable instances
179
   * sharing this <code>conf</code> instance.  Recommended.
181
   * sharing this <code>conf</code> instance.  Recommended.
180
   * @param conf Configuration object to use.
182
   * @param conf Configuration object to use.
181
   * @param tableName Name of the table.
183
   * @param tableName Name of the table.
182
   * @throws IOException if a remote or network exception occurs
184
   * @throws IOException if a remote or network exception occurs
183
   */
185
   */
184
  public HTable(Configuration conf, final byte [] tableName)
186
  public HTable(Configuration conf, final byte [] tableName)
185
  throws IOException {
187
  throws IOException {
186
    this.tableName = tableName;
188
    this.tableName = tableName;
187
    if (conf == null) {
189
    if (conf == null) {
188
      this.scannerTimeout = 0;
190
      this.scannerTimeout = 0;
189
      this.connection = null;
191
      this.connection = null;
190
      return;
192
      return;
191
    }
193
    }
192
    this.connection = HConnectionManager.getConnection(conf);
194
    this.connection = HConnectionManager.getConnection(conf);
193
    this.scannerTimeout =
195
    this.scannerTimeout =
194
      (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
196
      (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
195
    this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
197
    this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
196
        : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
198
        : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
197
            HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
199
            HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
198
    this.configuration = conf;
200
    this.configuration = conf;
199
    this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
201
    this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
200
    this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
202
    this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
201
    this.clearBufferOnFail = true;
203
    this.clearBufferOnFail = true;
202
    this.autoFlush = true;
204
    this.autoFlush = true;
203
    this.currentWriteBufferSize = 0;
205
    this.currentWriteBufferSize = 0;
204
    this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
206
    this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
205

    
   
207

   
206
    this.maxScannerResultSize = conf.getLong(
208
    this.maxScannerResultSize = conf.getLong(
207
      HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
209
      HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
208
      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
210
      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
209
    this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
211
    this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
210

    
   
212

   
211
    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
213
    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
212
    if (maxThreads == 0) {
214
    if (maxThreads == 0) {
213
      maxThreads = 1; // is there a better default?
215
      maxThreads = 1; // is there a better default?
214
    }
216
    }
215

    
   
217

   
216
    // Using the "direct handoff" approach, new threads will only be created
218
    // Using the "direct handoff" approach, new threads will only be created
217
    // if it is necessary and will grow unbounded. This could be bad but in HCM
219
    // if it is necessary and will grow unbounded. This could be bad but in HCM
218
    // we only create as many Runnables as there are region servers. It means
220
    // we only create as many Runnables as there are region servers. It means
219
    // it also scales when new region servers are added.
221
    // it also scales when new region servers are added.
220
    this.pool = new ThreadPoolExecutor(1, maxThreads,
222
    this.pool = new ThreadPoolExecutor(1, maxThreads,
221
        60, TimeUnit.SECONDS,
223
        60, TimeUnit.SECONDS,
222
        new SynchronousQueue<Runnable>(),
224
        new SynchronousQueue<Runnable>(),
223
        new DaemonThreadFactory());
225
        new DaemonThreadFactory());
224
    ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
226
    ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
225
    this.closed = false;
227
    this.closed = false;
226
  }
228
  }
227

    
   
229

   
228
  /**
230
  /**
229
   * {@inheritDoc}
231
   * {@inheritDoc}
230
   */
232
   */
231
  @Override
233
  @Override
232
  public Configuration getConfiguration() {
234
  public Configuration getConfiguration() {
233
    return configuration;
235
    return configuration;
234
  }
236
  }
235

    
   
237

   
236
  /**
238
  /**
237
   * Tells whether or not a table is enabled or not.
239
   * Tells whether or not a table is enabled or not.
238
   * @param tableName Name of table to check.
240
   * @param tableName Name of table to check.
239
   * @return {@code true} if table is online.
241
   * @return {@code true} if table is online.
240
   * @throws IOException if a remote or network exception occurs
242
   * @throws IOException if a remote or network exception occurs
241
	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
243
	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
242
   */
244
   */
243
  @Deprecated
245
  @Deprecated
244
  public static boolean isTableEnabled(String tableName) throws IOException {
246
  public static boolean isTableEnabled(String tableName) throws IOException {
245
    return isTableEnabled(Bytes.toBytes(tableName));
247
    return isTableEnabled(Bytes.toBytes(tableName));
246
  }
248
  }
247

    
   
249

   
248
  /**
250
  /**
249
   * Tells whether or not a table is enabled or not.
251
   * Tells whether or not a table is enabled or not.
250
   * @param tableName Name of table to check.
252
   * @param tableName Name of table to check.
251
   * @return {@code true} if table is online.
253
   * @return {@code true} if table is online.
252
   * @throws IOException if a remote or network exception occurs
254
   * @throws IOException if a remote or network exception occurs
253
	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
255
	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
254
   */
256
   */
255
  @Deprecated
257
  @Deprecated
256
  public static boolean isTableEnabled(byte[] tableName) throws IOException {
258
  public static boolean isTableEnabled(byte[] tableName) throws IOException {
257
    return isTableEnabled(HBaseConfiguration.create(), tableName);
259
    return isTableEnabled(HBaseConfiguration.create(), tableName);
258
  }
260
  }
259

    
   
261

   
260
  /**
262
  /**
261
   * Tells whether or not a table is enabled or not.
263
   * Tells whether or not a table is enabled or not.
262
   * @param conf The Configuration object to use.
264
   * @param conf The Configuration object to use.
263
   * @param tableName Name of table to check.
265
   * @param tableName Name of table to check.
264
   * @return {@code true} if table is online.
266
   * @return {@code true} if table is online.
265
   * @throws IOException if a remote or network exception occurs
267
   * @throws IOException if a remote or network exception occurs
266
	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
268
	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
267
   */
269
   */
268
  @Deprecated
270
  @Deprecated
269
  public static boolean isTableEnabled(Configuration conf, String tableName)
271
  public static boolean isTableEnabled(Configuration conf, String tableName)
270
  throws IOException {
272
  throws IOException {
271
    return isTableEnabled(conf, Bytes.toBytes(tableName));
273
    return isTableEnabled(conf, Bytes.toBytes(tableName));
272
  }
274
  }
273

    
   
275

   
274
  /**
276
  /**
275
   * Tells whether or not a table is enabled or not.
277
   * Tells whether or not a table is enabled or not.
276
   * @param conf The Configuration object to use.
278
   * @param conf The Configuration object to use.
277
   * @param tableName Name of table to check.
279
   * @param tableName Name of table to check.
278
   * @return {@code true} if table is online.
280
   * @return {@code true} if table is online.
279
   * @throws IOException if a remote or network exception occurs
281
   * @throws IOException if a remote or network exception occurs
280
   */
282
   */
281
  public static boolean isTableEnabled(Configuration conf,
283
  public static boolean isTableEnabled(Configuration conf,
282
      final byte[] tableName) throws IOException {
284
      final byte[] tableName) throws IOException {
283
    return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
285
    return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
284
      @Override
286
      @Override
285
      public Boolean connect(HConnection connection) throws IOException {
287
      public Boolean connect(HConnection connection) throws IOException {
286
        return connection.isTableEnabled(tableName);
288
        return connection.isTableEnabled(tableName);
287
      }
289
      }
288
    });
290
    });
289
  }
291
  }
290

    
   
292

   
291
  /**
293
  /**
292
   * Find region location hosting passed row using cached info
294
   * Find region location hosting passed row using cached info
293
   * @param row Row to find.
295
   * @param row Row to find.
294
   * @return The location of the given row.
296
   * @return The location of the given row.
295
   * @throws IOException if a remote or network exception occurs
297
   * @throws IOException if a remote or network exception occurs
296
   */
298
   */
297
  public HRegionLocation getRegionLocation(final String row)
299
  public HRegionLocation getRegionLocation(final String row)
298
  throws IOException {
300
  throws IOException {
299
    return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
301
    return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
300
  }
302
  }
301

    
   
303

   
302
  /**
304
  /**
303
   * Finds the region on which the given row is being served.
305
   * Finds the region on which the given row is being served.
304
   * @param row Row to find.
306
   * @param row Row to find.
305
   * @return Location of the row.
307
   * @return Location of the row.
306
   * @throws IOException if a remote or network exception occurs
308
   * @throws IOException if a remote or network exception occurs
307
   */
309
   */
308
  public HRegionLocation getRegionLocation(final byte [] row)
310
  public HRegionLocation getRegionLocation(final byte [] row)
309
  throws IOException {
311
  throws IOException {
310
    return connection.getRegionLocation(tableName, row, false);
312
    return connection.getRegionLocation(tableName, row, false);
311
  }
313
  }
312

    
   
314

   
313
  /**
315
  /**
314
   * {@inheritDoc}
316
   * {@inheritDoc}
315
   */
317
   */
316
  @Override
318
  @Override
317
  public byte [] getTableName() {
319
  public byte [] getTableName() {
318
    return this.tableName;
320
    return this.tableName;
319
  }
321
  }
320

    
   
322

   
321
  /**
323
  /**
322
   * <em>INTERNAL</em> Used by unit tests and tools to do low-level
324
   * <em>INTERNAL</em> Used by unit tests and tools to do low-level
323
   * manipulations.
325
   * manipulations.
324
   * @return An HConnection instance.
326
   * @return An HConnection instance.
325
   * @deprecated This method will be changed from public to package protected.
327
   * @deprecated This method will be changed from public to package protected.
326
   */
328
   */
327
  // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
329
  // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
328
  public HConnection getConnection() {
330
  public HConnection getConnection() {
329
    return this.connection;
331
    return this.connection;
330
  }
332
  }
331

    
   
333

   
332
  /**
334
  /**
333
   * Gets the number of rows that a scanner will fetch at once.
335
   * Gets the number of rows that a scanner will fetch at once.
334
   * <p>
336
   * <p>
335
   * The default value comes from {@code hbase.client.scanner.caching}.
337
   * The default value comes from {@code hbase.client.scanner.caching}.
336
   */
338
   */
337
  public int getScannerCaching() {
339
  public int getScannerCaching() {
338
    return scannerCaching;
340
    return scannerCaching;
339
  }
341
  }
340

    
   
342

   
341
  /**
343
  /**
342
   * Sets the number of rows that a scanner will fetch at once.
344
   * Sets the number of rows that a scanner will fetch at once.
343
   * <p>
345
   * <p>
344
   * This will override the value specified by
346
   * This will override the value specified by
345
   * {@code hbase.client.scanner.caching}.
347
   * {@code hbase.client.scanner.caching}.
346
   * Increasing this value will reduce the amount of work needed each time
348
   * Increasing this value will reduce the amount of work needed each time
347
   * {@code next()} is called on a scanner, at the expense of memory use
349
   * {@code next()} is called on a scanner, at the expense of memory use
348
   * (since more rows will need to be maintained in memory by the scanners).
350
   * (since more rows will need to be maintained in memory by the scanners).
349
   * @param scannerCaching the number of rows a scanner will fetch at once.
351
   * @param scannerCaching the number of rows a scanner will fetch at once.
350
   */
352
   */
351
  public void setScannerCaching(int scannerCaching) {
353
  public void setScannerCaching(int scannerCaching) {
352
    this.scannerCaching = scannerCaching;
354
    this.scannerCaching = scannerCaching;
353
  }
355
  }
354

    
   
356

   
355
  /**
357
  /**
356
   * {@inheritDoc}
358
   * {@inheritDoc}
357
   */
359
   */
358
  @Override
360
  @Override
359
  public HTableDescriptor getTableDescriptor() throws IOException {
361
  public HTableDescriptor getTableDescriptor() throws IOException {
360
    return new UnmodifyableHTableDescriptor(
362
    return new UnmodifyableHTableDescriptor(
361
      this.connection.getHTableDescriptor(this.tableName));
363
      this.connection.getHTableDescriptor(this.tableName));
362
  }
364
  }
363

    
   
365

   
364
  /**
366
  /**
365
   * Gets the starting row key for every region in the currently open table.
367
   * Gets the starting row key for every region in the currently open table.
366
   * <p>
368
   * <p>
367
   * This is mainly useful for the MapReduce integration.
369
   * This is mainly useful for the MapReduce integration.
368
   * @return Array of region starting row keys
370
   * @return Array of region starting row keys
369
   * @throws IOException if a remote or network exception occurs
371
   * @throws IOException if a remote or network exception occurs
370
   */
372
   */
371
  public byte [][] getStartKeys() throws IOException {
373
  public byte [][] getStartKeys() throws IOException {
372
    return getStartEndKeys().getFirst();
374
    return getStartEndKeys().getFirst();
373
  }
375
  }
374

    
   
376

   
375
  /**
377
  /**
376
   * Gets the ending row key for every region in the currently open table.
378
   * Gets the ending row key for every region in the currently open table.
377
   * <p>
379
   * <p>
378
   * This is mainly useful for the MapReduce integration.
380
   * This is mainly useful for the MapReduce integration.
379
   * @return Array of region ending row keys
381
   * @return Array of region ending row keys
380
   * @throws IOException if a remote or network exception occurs
382
   * @throws IOException if a remote or network exception occurs
381
   */
383
   */
382
  public byte[][] getEndKeys() throws IOException {
384
  public byte[][] getEndKeys() throws IOException {
383
    return getStartEndKeys().getSecond();
385
    return getStartEndKeys().getSecond();
384
  }
386
  }
385

    
   
387

   
386
  /**
388
  /**
387
   * Gets the starting and ending row keys for every region in the currently
389
   * Gets the starting and ending row keys for every region in the currently
388
   * open table.
390
   * open table.
389
   * <p>
391
   * <p>
390
   * This is mainly useful for the MapReduce integration.
392
   * This is mainly useful for the MapReduce integration.
391
   * @return Pair of arrays of region starting and ending row keys
393
   * @return Pair of arrays of region starting and ending row keys
392
   * @throws IOException if a remote or network exception occurs
394
   * @throws IOException if a remote or network exception occurs
393
   */
395
   */
394
  @SuppressWarnings("unchecked")
396
  @SuppressWarnings("unchecked")
395
  public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
397
  public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
396
    final List<byte[]> startKeyList = new ArrayList<byte[]>();
398
    final List<byte[]> startKeyList = new ArrayList<byte[]>();
397
    final List<byte[]> endKeyList = new ArrayList<byte[]>();
399
    final List<byte[]> endKeyList = new ArrayList<byte[]>();
398
    MetaScannerVisitor visitor = new MetaScannerVisitor() {
400
    MetaScannerVisitor visitor = new MetaScannerVisitor() {
399
      public boolean processRow(Result rowResult) throws IOException {
401
      public boolean processRow(Result rowResult) throws IOException {
400
        byte [] bytes = rowResult.getValue(HConstants.CATALOG_FAMILY,
402
        byte [] bytes = rowResult.getValue(HConstants.CATALOG_FAMILY,
401
          HConstants.REGIONINFO_QUALIFIER);
403
          HConstants.REGIONINFO_QUALIFIER);
402
        if (bytes == null) {
404
        if (bytes == null) {
403
          LOG.warn("Null " + HConstants.REGIONINFO_QUALIFIER + " cell in " +
405
          LOG.warn("Null " + HConstants.REGIONINFO_QUALIFIER + " cell in " +
404
            rowResult);
406
            rowResult);
405
          return true;
407
          return true;
406
        }
408
        }
407
        HRegionInfo info = Writables.getHRegionInfo(bytes);
409
        HRegionInfo info = Writables.getHRegionInfo(bytes);
408
        if (Bytes.equals(info.getTableName(), getTableName())) {
410
        if (Bytes.equals(info.getTableName(), getTableName())) {
409
          if (!(info.isOffline() || info.isSplit())) {
411
          if (!(info.isOffline() || info.isSplit())) {
410
            startKeyList.add(info.getStartKey());
412
            startKeyList.add(info.getStartKey());
411
            endKeyList.add(info.getEndKey());
413
            endKeyList.add(info.getEndKey());
412
          }
414
          }
413
        }
415
        }
414
        return true;
416
        return true;
415
      }
417
      }
416
    };
418
    };
417
    MetaScanner.metaScan(configuration, visitor, this.tableName);
419
    MetaScanner.metaScan(configuration, visitor, this.tableName);
418
    return new Pair(startKeyList.toArray(new byte[startKeyList.size()][]),
420
    return new Pair(startKeyList.toArray(new byte[startKeyList.size()][]),
419
                endKeyList.toArray(new byte[endKeyList.size()][]));
421
                endKeyList.toArray(new byte[endKeyList.size()][]));
420
  }
422
  }
421

    
   
423

   
422
  /**
424
  /**
423
   * Gets all the regions and their address for this table.
425
   * Gets all the regions and their address for this table.
424
   * @return A map of HRegionInfo with it's server address
426
   * @return A map of HRegionInfo with it's server address
425
   * @throws IOException if a remote or network exception occurs
427
   * @throws IOException if a remote or network exception occurs
426
   * @deprecated Use {@link #getRegionLocations()} or {@link #getStartEndKeys()}
428
   * @deprecated Use {@link #getRegionLocations()} or {@link #getStartEndKeys()}
427
   */
429
   */
428
  public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
430
  public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
429
    final Map<HRegionInfo, HServerAddress> regionMap =
431
    final Map<HRegionInfo, HServerAddress> regionMap =
430
      new TreeMap<HRegionInfo, HServerAddress>();
432
      new TreeMap<HRegionInfo, HServerAddress>();
431

    
   
433

   
432
    MetaScannerVisitor visitor = new MetaScannerVisitor() {
434
    MetaScannerVisitor visitor = new MetaScannerVisitor() {
433
      public boolean processRow(Result rowResult) throws IOException {
435
      public boolean processRow(Result rowResult) throws IOException {
434
        HRegionInfo info = Writables.getHRegionInfo(
436
        HRegionInfo info = Writables.getHRegionInfo(
435
            rowResult.getValue(HConstants.CATALOG_FAMILY,
437
            rowResult.getValue(HConstants.CATALOG_FAMILY,
436
                HConstants.REGIONINFO_QUALIFIER));
438
                HConstants.REGIONINFO_QUALIFIER));
437

    
   
439

   
438
        if (!(Bytes.equals(info.getTableName(), getTableName()))) {
440
        if (!(Bytes.equals(info.getTableName(), getTableName()))) {
439
          return false;
441
          return false;
440
        }
442
        }
441

    
   
443

   
442
        HServerAddress server = new HServerAddress();
444
        HServerAddress server = new HServerAddress();
443
        byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY,
445
        byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY,
444
            HConstants.SERVER_QUALIFIER);
446
            HConstants.SERVER_QUALIFIER);
445
        if (value != null && value.length > 0) {
447
        if (value != null && value.length > 0) {
446
          String hostAndPort = Bytes.toString(value);
448
          String hostAndPort = Bytes.toString(value);
447
          server = new HServerAddress(Addressing.createInetSocketAddressFromHostAndPortStr(hostAndPort));
449
          server = new HServerAddress(Addressing.createInetSocketAddressFromHostAndPortStr(hostAndPort));
448
        }
450
        }
449

    
   
451

   
450
        if (!(info.isOffline() || info.isSplit())) {
452
        if (!(info.isOffline() || info.isSplit())) {
451
          regionMap.put(new UnmodifyableHRegionInfo(info), server);
453
          regionMap.put(new UnmodifyableHRegionInfo(info), server);
452
        }
454
        }
453
        return true;
455
        return true;
454
      }
456
      }
455

    
   
457

   
456
    };
458
    };
457
    MetaScanner.metaScan(configuration, visitor, tableName);
459
    MetaScanner.metaScan(configuration, visitor, tableName);
458
    return regionMap;
460
    return regionMap;
459
  }
461
  }
460

    
   
462

   
461
  /**
463
  /**
462
   * Gets all the regions and their address for this table.
464
   * Gets all the regions and their address for this table.
463
   * <p>
465
   * <p>
464
   * This is mainly useful for the MapReduce integration.
466
   * This is mainly useful for the MapReduce integration.
465
   * @return A map of HRegionInfo with it's server address
467
   * @return A map of HRegionInfo with it's server address
466
   * @throws IOException if a remote or network exception occurs
468
   * @throws IOException if a remote or network exception occurs
467
   */
469
   */
468
  public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
470
  public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
469
    return MetaScanner.allTableRegions(getConfiguration(), getTableName(), false);
471
    return MetaScanner.allTableRegions(getConfiguration(), getTableName(), false);
470
  }
472
  }
471

    
   
473

   
472
  /**
474
  /**
473
   * Save the passed region information and the table's regions
475
   * Save the passed region information and the table's regions
474
   * cache.
476
   * cache.
475
   * <p>
477
   * <p>
476
   * This is mainly useful for the MapReduce integration. You can call
478
   * This is mainly useful for the MapReduce integration. You can call
477
   * {@link #deserializeRegionInfo deserializeRegionInfo}
479
   * {@link #deserializeRegionInfo deserializeRegionInfo}
478
   * to deserialize regions information from a
480
   * to deserialize regions information from a
479
   * {@link DataInput}, then call this method to load them to cache.
481
   * {@link DataInput}, then call this method to load them to cache.
480
   *
482
   *
481
   * <pre>
483
   * <pre>
482
   * {@code
484
   * {@code
483
   * HTable t1 = new HTable("foo");
485
   * HTable t1 = new HTable("foo");
484
   * FileInputStream fis = new FileInputStream("regions.dat");
486
   * FileInputStream fis = new FileInputStream("regions.dat");
485
   * DataInputStream dis = new DataInputStream(fis);
487
   * DataInputStream dis = new DataInputStream(fis);
486
   *
488
   *
487
   * Map<HRegionInfo, HServerAddress> hm = t1.deserializeRegionInfo(dis);
489
   * Map<HRegionInfo, HServerAddress> hm = t1.deserializeRegionInfo(dis);
488
   * t1.prewarmRegionCache(hm);
490
   * t1.prewarmRegionCache(hm);
489
   * }
491
   * }
490
   * </pre>
492
   * </pre>
491
   * @param regionMap This piece of regions information will be loaded
493
   * @param regionMap This piece of regions information will be loaded
492
   * to region cache.
494
   * to region cache.
493
   */
495
   */
494
  public void prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap) {
496
  public void prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap) {
495
    this.connection.prewarmRegionCache(this.getTableName(), regionMap);
497
    this.connection.prewarmRegionCache(this.getTableName(), regionMap);
496
  }
498
  }
497

    
   
499

   
498
  /**
500
  /**
499
   * Serialize the regions information of this table and output
501
   * Serialize the regions information of this table and output
500
   * to <code>out</code>.
502
   * to <code>out</code>.
501
   * <p>
503
   * <p>
502
   * This is mainly useful for the MapReduce integration. A client could
504
   * This is mainly useful for the MapReduce integration. A client could
503
   * perform a large scan for all the regions for the table, serialize the
505
   * perform a large scan for all the regions for the table, serialize the
504
   * region info to a file. MR job can ship a copy of the meta for the table in
506
   * region info to a file. MR job can ship a copy of the meta for the table in
505
   * the DistributedCache.
507
   * the DistributedCache.
506
   * <pre>
508
   * <pre>
507
   * {@code
509
   * {@code
508
   * FileOutputStream fos = new FileOutputStream("regions.dat");
510
   * FileOutputStream fos = new FileOutputStream("regions.dat");
509
   * DataOutputStream dos = new DataOutputStream(fos);
511
   * DataOutputStream dos = new DataOutputStream(fos);
510
   * table.serializeRegionInfo(dos);
512
   * table.serializeRegionInfo(dos);
511
   * dos.flush();
513
   * dos.flush();
512
   * dos.close();
514
   * dos.close();
513
   * }
515
   * }
514
   * </pre>
516
   * </pre>
515
   * @param out {@link DataOutput} to serialize this object into.
517
   * @param out {@link DataOutput} to serialize this object into.
516
   * @throws IOException if a remote or network exception occurs
518
   * @throws IOException if a remote or network exception occurs
517
   */
519
   */
518
  public void serializeRegionInfo(DataOutput out) throws IOException {
520
  public void serializeRegionInfo(DataOutput out) throws IOException {
519
    Map<HRegionInfo, HServerAddress> allRegions = this.getRegionsInfo();
521
    Map<HRegionInfo, HServerAddress> allRegions = this.getRegionsInfo();
520
    // first, write number of regions
522
    // first, write number of regions
521
    out.writeInt(allRegions.size());
523
    out.writeInt(allRegions.size());
522
    for (Map.Entry<HRegionInfo, HServerAddress> es : allRegions.entrySet()) {
524
    for (Map.Entry<HRegionInfo, HServerAddress> es : allRegions.entrySet()) {
523
      es.getKey().write(out);
525
      es.getKey().write(out);
524
      es.getValue().write(out);
526
      es.getValue().write(out);
525
    }
527
    }
526
  }
528
  }
527

    
   
529

   
528
  /**
530
  /**
529
   * Read from <code>in</code> and deserialize the regions information.
531
   * Read from <code>in</code> and deserialize the regions information.
530
   *
532
   *
531
   * <p>It behaves similarly as {@link #getRegionsInfo getRegionsInfo}, except
533
   * <p>It behaves similarly as {@link #getRegionsInfo getRegionsInfo}, except
532
   * that it loads the region map from a {@link DataInput} object.
534
   * that it loads the region map from a {@link DataInput} object.
533
   *
535
   *
534
   * <p>It is supposed to be followed immediately by  {@link
536
   * <p>It is supposed to be followed immediately by  {@link
535
   * #prewarmRegionCache prewarmRegionCache}.
537
   * #prewarmRegionCache prewarmRegionCache}.
536
   *
538
   *
537
   * <p>
539
   * <p>
538
   * Please refer to {@link #prewarmRegionCache prewarmRegionCache} for usage.
540
   * Please refer to {@link #prewarmRegionCache prewarmRegionCache} for usage.
539
   *
541
   *
540
   * @param in {@link DataInput} object.
542
   * @param in {@link DataInput} object.
541
   * @return A map of HRegionInfo with its server address.
543
   * @return A map of HRegionInfo with its server address.
542
   * @throws IOException if an I/O exception occurs.
544
   * @throws IOException if an I/O exception occurs.
543
   */
545
   */
544
  public Map<HRegionInfo, HServerAddress> deserializeRegionInfo(DataInput in)
546
  public Map<HRegionInfo, HServerAddress> deserializeRegionInfo(DataInput in)
545
  throws IOException {
547
  throws IOException {
546
    final Map<HRegionInfo, HServerAddress> allRegions =
548
    final Map<HRegionInfo, HServerAddress> allRegions =
547
      new TreeMap<HRegionInfo, HServerAddress>();
549
      new TreeMap<HRegionInfo, HServerAddress>();
548

    
   
550

   
549
    // the first integer is expected to be the size of records
551
    // the first integer is expected to be the size of records
550
    int regionsCount = in.readInt();
552
    int regionsCount = in.readInt();
551
    for (int i = 0; i < regionsCount; ++i) {
553
    for (int i = 0; i < regionsCount; ++i) {
552
      HRegionInfo hri = new HRegionInfo();
554
      HRegionInfo hri = new HRegionInfo();
553
      hri.readFields(in);
555
      hri.readFields(in);
554
      HServerAddress hsa = new HServerAddress();
556
      HServerAddress hsa = new HServerAddress();
555
      hsa.readFields(in);
557
      hsa.readFields(in);
556
      allRegions.put(hri, hsa);
558
      allRegions.put(hri, hsa);
557
    }
559
    }
558
    return allRegions;
560
    return allRegions;
559
  }
561
  }
560

    
   
562

   
561
  /**
563
  /**
562
   * {@inheritDoc}
564
   * {@inheritDoc}
563
   */
565
   */
564
   @Override
566
   @Override
565
   public Result getRowOrBefore(final byte[] row, final byte[] family)
567
   public Result getRowOrBefore(final byte[] row, final byte[] family)
566
   throws IOException {
568
   throws IOException {
567
     return connection.getRegionServerWithRetries(
569
     return connection.getRegionServerWithRetries(
568
         new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
570
         new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
569
       public Result call() throws IOException {
571
       public Result call() throws IOException {
570
         return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
572
         return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
571
           row, family);
573
           row, family);
572
       }
574
       }
573
     });
575
     });
574
   }
576
   }
575

    
   
577

   
576
   /**
578
   /**
577
    * {@inheritDoc}
579
    * {@inheritDoc}
578
    */
580
    */
579
  @Override
581
  @Override
580
  public ResultScanner getScanner(final Scan scan) throws IOException {
582
  public ResultScanner getScanner(final Scan scan) throws IOException {
581
    ClientScanner s = new ClientScanner(scan);
583
    ClientScanner s = new ClientScanner(scan);
582
    s.initialize();
584
    s.initialize();
583
    return s;
585
    return s;
584
  }
586
  }
585

    
   
587

   
586
  /**
588
  /**
587
   * {@inheritDoc}
589
   * {@inheritDoc}
588
   */
590
   */
589
  @Override
591
  @Override
590
  public ResultScanner getScanner(byte [] family) throws IOException {
592
  public ResultScanner getScanner(byte [] family) throws IOException {
591
    Scan scan = new Scan();
593
    Scan scan = new Scan();
592
    scan.addFamily(family);
594
    scan.addFamily(family);
593
    return getScanner(scan);
595
    return getScanner(scan);
594
  }
596
  }
595

    
   
597

   
596
  /**
598
  /**
597
   * {@inheritDoc}
599
   * {@inheritDoc}
598
   */
600
   */
599
  @Override
601
  @Override
600
  public ResultScanner getScanner(byte [] family, byte [] qualifier)
602
  public ResultScanner getScanner(byte [] family, byte [] qualifier)
601
  throws IOException {
603
  throws IOException {
602
    Scan scan = new Scan();
604
    Scan scan = new Scan();
603
    scan.addColumn(family, qualifier);
605
    scan.addColumn(family, qualifier);
604
    return getScanner(scan);
606
    return getScanner(scan);
605
  }
607
  }
606

    
   
608

   
607
  /**
609
  /**
608
   * {@inheritDoc}
610
   * {@inheritDoc}
609
   */
611
   */
610
  @Override
612
  @Override
611
  public Result get(final Get get) throws IOException {
613
  public Result get(final Get get) throws IOException {
612
    return connection.getRegionServerWithRetries(
614
    return connection.getRegionServerWithRetries(
613
        new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
615
        new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
614
          public Result call() throws IOException {
616
          public Result call() throws IOException {
615
            return server.get(location.getRegionInfo().getRegionName(), get);
617
            return server.get(location.getRegionInfo().getRegionName(), get);
616
          }
618
          }
617
        }
619
        }
618
    );
620
    );
619
  }
621
  }
620

    
   
622

   
621
  /**
623
  /**
622
   * {@inheritDoc}
624
   * {@inheritDoc}
623
   */
625
   */
624
  @Override
626
  @Override
625
  public Result[] get(List<Get> gets) throws IOException {
627
  public Result[] get(List<Get> gets) throws IOException {
626
    try {
628
    try {
627
      Object [] r1 = batch((List)gets);
629
      Object [] r1 = batch((List)gets);
628

    
   
630

   
629
      // translate.
631
      // translate.
630
      Result [] results = new Result[r1.length];
632
      Result [] results = new Result[r1.length];
631
      int i=0;
633
      int i=0;
632
      for (Object o : r1) {
634
      for (Object o : r1) {
633
        // batch ensures if there is a failure we get an exception instead
635
        // batch ensures if there is a failure we get an exception instead
634
        results[i++] = (Result) o;
636
        results[i++] = (Result) o;
635
      }
637
      }
636

    
   
638

   
637
      return results;
639
      return results;
638
    } catch (InterruptedException e) {
640
    } catch (InterruptedException e) {
639
      throw new IOException(e);
641
      throw new IOException(e);
640
    }
642
    }
641
  }
643
  }
642

    
   
644

   
643
  /**
645
  /**
644
   * {@inheritDoc}
646
   * {@inheritDoc}
645
   */
647
   */
646
  @Override
648
  @Override
647
  public synchronized void batch(final List<Row> actions, final Object[] results)
649
  public synchronized void batch(final List<Row> actions, final Object[] results)
648
      throws InterruptedException, IOException {
650
      throws InterruptedException, IOException {
649
    connection.processBatch(actions, tableName, pool, results);
651
    connection.processBatch(actions, tableName, pool, results);
650
  }
652
  }
651

    
   
653

   
652
  /**
654
  /**
653
   * {@inheritDoc}
655
   * {@inheritDoc}
654
   */
656
   */
655
  @Override
657
  @Override
656
  public synchronized Object[] batch(final List<Row> actions) throws InterruptedException, IOException {
658
  public synchronized Object[] batch(final List<Row> actions) throws InterruptedException, IOException {
657
    Object[] results = new Object[actions.size()];
659
    Object[] results = new Object[actions.size()];
658
    connection.processBatch(actions, tableName, pool, results);
660
    connection.processBatch(actions, tableName, pool, results);
659
    return results;
661
    return results;
660
  }
662
  }
661

    
   
663

   
662
  /**
664
  /**
663
   * {@inheritDoc}
665
   * {@inheritDoc}
664
   */
666
   */
665
  @Override
667
  @Override
666
  public void delete(final Delete delete)
668
  public void delete(final Delete delete)
667
  throws IOException {
669
  throws IOException {
668
    connection.getRegionServerWithRetries(
670
    connection.getRegionServerWithRetries(
669
        new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
671
        new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
670
          public Boolean call() throws IOException {
672
          public Boolean call() throws IOException {
671
            server.delete(location.getRegionInfo().getRegionName(), delete);
673
            server.delete(location.getRegionInfo().getRegionName(), delete);
672
            return null; // FindBugs NP_BOOLEAN_RETURN_NULL
674
            return null; // FindBugs NP_BOOLEAN_RETURN_NULL
673
          }
675
          }
674
        }
676
        }
675
    );
677
    );
676
  }
678
  }
677

    
   
679

   
678
  /**
680
  /**
679
   * {@inheritDoc}
681
   * {@inheritDoc}
680
   */
682
   */
681
  @Override
683
  @Override
682
  public void delete(final List<Delete> deletes)
684
  public void delete(final List<Delete> deletes)
683
  throws IOException {
685
  throws IOException {
684
    Object[] results = new Object[deletes.size()];
686
    Object[] results = new Object[deletes.size()];
685
    try {
687
    try {
686
      connection.processBatch((List) deletes, tableName, pool, results);
688
      connection.processBatch((List) deletes, tableName, pool, results);
687
    } catch (InterruptedException e) {
689
    } catch (InterruptedException e) {
688
      throw new IOException(e);
690
      throw new IOException(e);
689
    } finally {
691
    } finally {
690
      // mutate list so that it is empty for complete success, or contains only failed records
692
      // mutate list so that it is empty for complete success, or contains only failed records
691
      // results are returned in the same order as the requests in list
693
      // results are returned in the same order as the requests in list
692
      // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
694
      // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
693
      for (int i = results.length - 1; i>=0; i--) {
695
      for (int i = results.length - 1; i>=0; i--) {
694
        // if result is not null, it succeeded
696
        // if result is not null, it succeeded
695
        if (results[i] instanceof Result) {
697
        if (results[i] instanceof Result) {
696
          deletes.remove(i);
698
          deletes.remove(i);
697
        }
699
        }
698
      }
700
      }
699
    }
701
    }
700
  }
702
  }
701

    
   
703

   
702
  /**
704
  /**
703
   * {@inheritDoc}
705
   * {@inheritDoc}
704
   */
706
   */
705
  @Override
707
  @Override
706
  public void put(final Put put) throws IOException {
708
  public void put(final Put put) throws IOException {
707
    doPut(Arrays.asList(put));
709
    doPut(Arrays.asList(put));
708
  }
710
  }
709

    
   
711

   
710
  /**
712
  /**
711
   * {@inheritDoc}
713
   * {@inheritDoc}
712
   */
714
   */
713
  @Override
715
  @Override
714
  public void put(final List<Put> puts) throws IOException {
716
  public void put(final List<Put> puts) throws IOException {
715
    doPut(puts);
717
    doPut(puts);
716
  }
718
  }
717

    
   
719

   
718
  private void doPut(final List<Put> puts) throws IOException {
720
  private void doPut(final List<Put> puts) throws IOException {
719
    int n = 0;
721
    int n = 0;
720
    for (Put put : puts) {
722
    for (Put put : puts) {
721
      validatePut(put);
723
      validatePut(put);
722
      writeBuffer.add(put);
724
      writeBuffer.add(put);
723
      currentWriteBufferSize += put.heapSize();
725
      currentWriteBufferSize += put.heapSize();
724
     
726
     
725
      // we need to periodically see if the writebuffer is full instead of waiting until the end of the List
727
      // we need to periodically see if the writebuffer is full instead of waiting until the end of the List
726
      n++;
728
      n++;
727
      if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) {
729
      if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) {
728
        flushCommits();
730
        flushCommits();
729
      }
731
      }
730
    }
732
    }
731
    if (autoFlush || currentWriteBufferSize > writeBufferSize) {
733
    if (autoFlush || currentWriteBufferSize > writeBufferSize) {
732
      flushCommits();
734
      flushCommits();
733
    }
735
    }
734
  }
736
  }
735

    
   
737

   
736
  /**
738
  /**
737
   * {@inheritDoc}
739
   * {@inheritDoc}
738
   */
740
   */
739
  @Override
741
  @Override
740
  public Result increment(final Increment increment) throws IOException {
742
  public Result increment(final Increment increment) throws IOException {
741
    if (!increment.hasFamilies()) {
743
    if (!increment.hasFamilies()) {
742
      throw new IOException(
744
      throw new IOException(
743
          "Invalid arguments to increment, no columns specified");
745
          "Invalid arguments to increment, no columns specified");
744
    }
746
    }
745
    return connection.getRegionServerWithRetries(
747
    return connection.getRegionServerWithRetries(
746
        new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
748
        new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
747
          public Result call() throws IOException {
749
          public Result call() throws IOException {
748
            return server.increment(
750
            return server.increment(
749
                location.getRegionInfo().getRegionName(), increment);
751
                location.getRegionInfo().getRegionName(), increment);
750
          }
752
          }
751
        }
753
        }
752
    );
754
    );
753
  }
755
  }
754

    
   
756

   
755
  /**
757
  /**
756
   * {@inheritDoc}
758
   * {@inheritDoc}
757
   */
759
   */
758
  @Override
760
  @Override
759
  public long incrementColumnValue(final byte [] row, final byte [] family,
761
  public long incrementColumnValue(final byte [] row, final byte [] family,
760
      final byte [] qualifier, final long amount)
762
      final byte [] qualifier, final long amount)
761
  throws IOException {
763
  throws IOException {
762
    return incrementColumnValue(row, family, qualifier, amount, true);
764
    return incrementColumnValue(row, family, qualifier, amount, true);
763
  }
765
  }
764

    
   
766

   
765
  /**
767
  /**
766
   * {@inheritDoc}
768
   * {@inheritDoc}
767
   */
769
   */
768
  @Override
770
  @Override
769
  public long incrementColumnValue(final byte [] row, final byte [] family,
771
  public long incrementColumnValue(final byte [] row, final byte [] family,
770
      final byte [] qualifier, final long amount, final boolean writeToWAL)
772
      final byte [] qualifier, final long amount, final boolean writeToWAL)
771
  throws IOException {
773
  throws IOException {
772
    NullPointerException npe = null;
774
    NullPointerException npe = null;
773
    if (row == null) {
775
    if (row == null) {
774
      npe = new NullPointerException("row is null");
776
      npe = new NullPointerException("row is null");
775
    } else if (family == null) {
777
    } else if (family == null) {
776
      npe = new NullPointerException("column is null");
778
      npe = new NullPointerException("column is null");
777
    }
779
    }
778
    if (npe != null) {
780
    if (npe != null) {
779
      throw new IOException(
781
      throw new IOException(
780
          "Invalid arguments to incrementColumnValue", npe);
782
          "Invalid arguments to incrementColumnValue", npe);
781
    }
783
    }
782
    return connection.getRegionServerWithRetries(
784
    return connection.getRegionServerWithRetries(
783
        new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
785
        new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
784
          public Long call() throws IOException {
786
          public Long call() throws IOException {
785
            return server.incrementColumnValue(
787
            return server.incrementColumnValue(
786
                location.getRegionInfo().getRegionName(), row, family,
788
                location.getRegionInfo().getRegionName(), row, family,
787
                qualifier, amount, writeToWAL);
789
                qualifier, amount, writeToWAL);
788
          }
790
          }
789
        }
791
        }
790
    );
792
    );
791
  }
793
  }
792

    
   
794

   
793
  /**
795
  /**
794
   * {@inheritDoc}
796
   * {@inheritDoc}
795
   */
797
   */
796
  @Override
798
  @Override
797
  public boolean checkAndPut(final byte [] row,
799
  public boolean checkAndPut(final byte [] row,
798
      final byte [] family, final byte [] qualifier, final byte [] value,
800
      final byte [] family, final byte [] qualifier, final byte [] value,
799
      final Put put)
801
      final Put put)
800
  throws IOException {
802
  throws IOException {
801
    return connection.getRegionServerWithRetries(
803
    return connection.getRegionServerWithRetries(
802
        new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
804
        new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
803
          public Boolean call() throws IOException {
805
          public Boolean call() throws IOException {
804
            return server.checkAndPut(location.getRegionInfo().getRegionName(),
806
            return server.checkAndPut(location.getRegionInfo().getRegionName(),
805
                row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
807
                row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
806
          }
808
          }
807
        }
809
        }
808
    );
810
    );
809
  }
811
  }
810

    
   
812

   
811

    
   
813

   
812
  /**
814
  /**
813
   * {@inheritDoc}
815
   * {@inheritDoc}
814
   */
816
   */
815
  @Override
817
  @Override
816
  public boolean checkAndDelete(final byte [] row,
818
  public boolean checkAndDelete(final byte [] row,
817
      final byte [] family, final byte [] qualifier, final byte [] value,
819
      final byte [] family, final byte [] qualifier, final byte [] value,
818
      final Delete delete)
820
      final Delete delete)
819
  throws IOException {
821
  throws IOException {
820
    return connection.getRegionServerWithRetries(
822
    return connection.getRegionServerWithRetries(
821
        new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
823
        new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
822
          public Boolean call() throws IOException {
824
          public Boolean call() throws IOException {
823
            return server.checkAndDelete(
825
            return server.checkAndDelete(
824
                location.getRegionInfo().getRegionName(),
826
                location.getRegionInfo().getRegionName(),
825
                row, family, qualifier, value, delete)
827
                row, family, qualifier, value, delete)
826
            ? Boolean.TRUE : Boolean.FALSE;
828
            ? Boolean.TRUE : Boolean.FALSE;
827
          }
829
          }
828
        }
830
        }
829
    );
831
    );
830
  }
832
  }
831

    
   
833

   
832
  /**
834
  /**
833
   * {@inheritDoc}
835
   * {@inheritDoc}
834
   */
836
   */
835
  @Override
837
  @Override
836
  public boolean exists(final Get get) throws IOException {
838
  public boolean exists(final Get get) throws IOException {
837
    return connection.getRegionServerWithRetries(
839
    return connection.getRegionServerWithRetries(
838
        new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
840
        new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
839
          public Boolean call() throws IOException {
841
          public Boolean call() throws IOException {
840
            return server.
842
            return server.
841
                exists(location.getRegionInfo().getRegionName(), get);
843
                exists(location.getRegionInfo().getRegionName(), get);
842
          }
844
          }
843
        }
845
        }
844
    );
846
    );
845
  }
847
  }
846

    
   
848

   
847
  /**
849
  /**
848
   * {@inheritDoc}
850
   * {@inheritDoc}
849
   */
851
   */
850
  @Override
852
  @Override
851
  public void flushCommits() throws IOException {
853
  public void flushCommits() throws IOException {
852
    try {
854
    try {
853
      connection.processBatchOfPuts(writeBuffer, tableName, pool);
855
      connection.processBatchOfPuts(writeBuffer, tableName, pool);
854
    } finally {
856
    } finally {
855
      if (clearBufferOnFail) {
857
      if (clearBufferOnFail) {
856
        writeBuffer.clear();
858
        writeBuffer.clear();
857
        currentWriteBufferSize = 0;
859
        currentWriteBufferSize = 0;
858
      } else {
860
      } else {
859
        // the write buffer was adjusted by processBatchOfPuts
861
        // the write buffer was adjusted by processBatchOfPuts
860
        currentWriteBufferSize = 0;
862
        currentWriteBufferSize = 0;
861
        for (Put aPut : writeBuffer) {
863
        for (Put aPut : writeBuffer) {
862
          currentWriteBufferSize += aPut.heapSize();
864
          currentWriteBufferSize += aPut.heapSize();
863
        }
865
        }
864
      }
866
      }
865
    }
867
    }
866
  }
868
  }
867

    
   
869

   
868
  /**
870
  /**
869
   * {@inheritDoc}
871
   * {@inheritDoc}
870
   */
872
   */
871
  @Override
873
  @Override
872
  public void close() throws IOException {
874
  public void close() throws IOException {
873
    if (this.closed) {
875
    if (this.closed) {
874
      return;
876
      return;
875
    }
877
    }
876
    flushCommits();
878
    flushCommits();
877
    this.pool.shutdown();
879
    this.pool.shutdown();
878
    if (this.connection != null) {
880
    if (this.connection != null) {
879
      this.connection.close();
881
      this.connection.close();
880
    }
882
    }
881
    this.closed = true;
883
    this.closed = true;
882
  }
884
  }
883

    
   
885

   
884
  // validate for well-formedness
886
  // validate for well-formedness
885
  private void validatePut(final Put put) throws IllegalArgumentException{
887
  private void validatePut(final Put put) throws IllegalArgumentException{
886
    if (put.isEmpty()) {
888
    if (put.isEmpty()) {
887
      throw new IllegalArgumentException("No columns to insert");
889
      throw new IllegalArgumentException("No columns to insert");
888
    }
890
    }
889
    if (maxKeyValueSize > 0) {
891
    if (maxKeyValueSize > 0) {
890
      for (List<KeyValue> list : put.getFamilyMap().values()) {
892
      for (List<KeyValue> list : put.getFamilyMap().values()) {
891
        for (KeyValue kv : list) {
893
        for (KeyValue kv : list) {
892
          if (kv.getLength() > maxKeyValueSize) {
894
          if (kv.getLength() > maxKeyValueSize) {
893
            throw new IllegalArgumentException("KeyValue size too large");
895
            throw new IllegalArgumentException("KeyValue size too large");
894
          }
896
          }
895
        }
897
        }
896
      }
898
      }
897
    }
899
    }
898
  }
900
  }
899

    
   
901

   
900
  /**
902
  /**
901
   * {@inheritDoc}
903
   * {@inheritDoc}
902
   */
904
   */
903
  @Override
905
  @Override
904
  public RowLock lockRow(final byte [] row)
906
  public RowLock lockRow(final byte [] row)
905
  throws IOException {
907
  throws IOException {
906
    return connection.getRegionServerWithRetries(
908
    return connection.getRegionServerWithRetries(
907
      new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
909
      new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
908
        public RowLock call() throws IOException {
910
        public RowLock call() throws IOException {
909
          long lockId =
911
          long lockId =
910
              server.lockRow(location.getRegionInfo().getRegionName(), row);
912
              server.lockRow(location.getRegionInfo().getRegionName(), row);
911
          return new RowLock(row,lockId);
913
          return new RowLock(row,lockId);
912
        }
914
        }
913
      }
915
      }
914
    );
916
    );
915
  }
917
  }
916

    
   
918

   
917
  /**
919
  /**
918
   * {@inheritDoc}
920
   * {@inheritDoc}
919
   */
921
   */
920
  @Override
922
  @Override
921
  public void unlockRow(final RowLock rl)
923
  public void unlockRow(final RowLock rl)
922
  throws IOException {
924
  throws IOException {
923
    connection.getRegionServerWithRetries(
925
    connection.getRegionServerWithRetries(
924
      new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
926
      new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
925
        public Boolean call() throws IOException {
927
        public Boolean call() throws IOException {
926
          server.unlockRow(location.getRegionInfo().getRegionName(),
928
          server.unlockRow(location.getRegionInfo().getRegionName(),
927
              rl.getLockId());
929
              rl.getLockId());
928
          return null; // FindBugs NP_BOOLEAN_RETURN_NULL
930
          return null; // FindBugs NP_BOOLEAN_RETURN_NULL
929
        }
931
        }
930
      }
932
      }
931
    );
933
    );
932
  }
934
  }
933

    
   
935

   
934
  /**
936
  /**
935
   * {@inheritDoc}
937
   * {@inheritDoc}
936
   */
938
   */
937
  @Override
939
  @Override
938
  public boolean isAutoFlush() {
940
  public boolean isAutoFlush() {
939
    return autoFlush;
941
    return autoFlush;
940
  }
942
  }
941

    
   
943

   
942
  /**
944
  /**
943
   * See {@link #setAutoFlush(boolean, boolean)}
945
   * See {@link #setAutoFlush(boolean, boolean)}
944
   *
946
   *
945
   * @param autoFlush
947
   * @param autoFlush
946
   *          Whether or not to enable 'auto-flush'.
948
   *          Whether or not to enable 'auto-flush'.
947
   */
949
   */
948
  public void setAutoFlush(boolean autoFlush) {
950
  public void setAutoFlush(boolean autoFlush) {
949
    setAutoFlush(autoFlush, autoFlush);
951
    setAutoFlush(autoFlush, autoFlush);
950
  }
952
  }
951

    
   
953

   
952
  /**
954
  /**
953
   * Turns 'auto-flush' on or off.
955
   * Turns 'auto-flush' on or off.
954
   * <p>
956
   * <p>
955
   * When enabled (default), {@link Put} operations don't get buffered/delayed
957
   * When enabled (default), {@link Put} operations don't get buffered/delayed
956
   * and are immediately executed. Failed operations are not retried. This is
958
   * and are immediately executed. Failed operations are not retried. This is
957
   * slower but safer.
959
   * slower but safer.
958
   * <p>
960
   * <p>
959
   * Turning off {@link #autoFlush} means that multiple {@link Put}s will be
961
   * Turning off {@link #autoFlush} means that multiple {@link Put}s will be
960
   * accepted before any RPC is actually sent to do the write operations. If the
962
   * accepted before any RPC is actually sent to do the write operations. If the
961
   * application dies before pending writes get flushed to HBase, data will be
963
   * application dies before pending writes get flushed to HBase, data will be
962
   * lost.
964
   * lost.
963
   * <p>
965
   * <p>
964
   * When you turn {@link #autoFlush} off, you should also consider the
966
   * When you turn {@link #autoFlush} off, you should also consider the
965
   * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put}
967
   * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put}
966
   * requests will be retried on failure until successful. However, this can
968
   * requests will be retried on failure until successful. However, this can
967
   * pollute the writeBuffer and slow down batching performance. Additionally,
969
   * pollute the writeBuffer and slow down batching performance. Additionally,
968
   * you may want to issue a number of Put requests and call
970
   * you may want to issue a number of Put requests and call
969
   * {@link #flushCommits()} as a barrier. In both use cases, consider setting
971
   * {@link #flushCommits()} as a barrier. In both use cases, consider setting
970
   * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
972
   * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
971
   * has been called, regardless of success.
973
   * has been called, regardless of success.
972
   *
974
   *
973
   * @param autoFlush
975
   * @param autoFlush
974
   *          Whether or not to enable 'auto-flush'.
976
   *          Whether or not to enable 'auto-flush'.
975
   * @param clearBufferOnFail
977
   * @param clearBufferOnFail
976
   *          Whether to keep Put failures in the writeBuffer
978
   *          Whether to keep Put failures in the writeBuffer
977
   * @see #flushCommits
979
   * @see #flushCommits
978
   */
980
   */
979
  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
981
  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
980
    this.autoFlush = autoFlush;
982
    this.autoFlush = autoFlush;
981
    this.clearBufferOnFail = autoFlush || clearBufferOnFail;
983
    this.clearBufferOnFail = autoFlush || clearBufferOnFail;
982
  }
984
  }
983

    
   
985

   
984
  /**
986
  /**
985
   * Returns the maximum size in bytes of the write buffer for this HTable.
987
   * Returns the maximum size in bytes of the write buffer for this HTable.
986
   * <p>
988
   * <p>
987
   * The default value comes from the configuration parameter
989
   * The default value comes from the configuration parameter
988
   * {@code hbase.client.write.buffer}.
990
   * {@code hbase.client.write.buffer}.
989
   * @return The size of the write buffer in bytes.
991
   * @return The size of the write buffer in bytes.
990
   */
992
   */
991
  public long getWriteBufferSize() {
993
  public long getWriteBufferSize() {
992
    return writeBufferSize;
994
    return writeBufferSize;
993
  }
995
  }
994

    
   
996

   
995
  /**
997
  /**
996
   * Sets the size of the buffer in bytes.
998
   * Sets the size of the buffer in bytes.
997
   * <p>
999
   * <p>
998
   * If the new size is less than the current amount of data in the
1000
   * If the new size is less than the current amount of data in the
999
   * write buffer, the buffer gets flushed.
1001
   * write buffer, the buffer gets flushed.
1000
   * @param writeBufferSize The new write buffer size, in bytes.
1002
   * @param writeBufferSize The new write buffer size, in bytes.
1001
   * @throws IOException if a remote or network exception occurs.
1003
   * @throws IOException if a remote or network exception occurs.
1002
   */
1004
   */
1003
  public void setWriteBufferSize(long writeBufferSize) throws IOException {
1005
  public void setWriteBufferSize(long writeBufferSize) throws IOException {
1004
    this.writeBufferSize = writeBufferSize;
1006
    this.writeBufferSize = writeBufferSize;
1005
    if(currentWriteBufferSize > writeBufferSize) {
1007
    if(currentWriteBufferSize > writeBufferSize) {
1006
      flushCommits();
1008
      flushCommits();
1007
    }
1009
    }
1008
  }
1010
  }
1009

    
   
1011

   
1010
  /**
1012
  /**
1011
   * Returns the write buffer.
1013
   * Returns the write buffer.
1012
   * @return The current write buffer.
1014
   * @return The current write buffer.
1013
   */
1015
   */
1014
  public ArrayList<Put> getWriteBuffer() {
1016
  public ArrayList<Put> getWriteBuffer() {
1015
    return writeBuffer;
1017
    return writeBuffer;
1016
  }
1018
  }
1017

    
   
1019

   
1018
  /**
1020
  /**
1019
   * Implements the scanner interface for the HBase client.
1021
   * Implements the scanner interface for the HBase client.
1020
   * If there are multiple regions in a table, this scanner will iterate
1022
   * If there are multiple regions in a table, this scanner will iterate
1021
   * through them all.
1023
   * through them all.
1022
   */
1024
   */
1023
  protected class ClientScanner implements ResultScanner {
1025
  protected class ClientScanner implements ResultScanner {
1024
    private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
1026
    private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
1025
    // HEADSUP: The scan internal start row can change as we move through table.
1027
    // HEADSUP: The scan internal start row can change as we move through table.
1026
    private Scan scan;
1028
    private Scan scan;
1027
    private boolean closed = false;
1029
    private boolean closed = false;
1028
    // Current region scanner is against.  Gets cleared if current region goes
1030
    // Current region scanner is against.  Gets cleared if current region goes
1029
    // wonky: e.g. if it splits on us.
1031
    // wonky: e.g. if it splits on us.
1030
    private HRegionInfo currentRegion = null;
1032
    private HRegionInfo currentRegion = null;
1031
    private ScannerCallable callable = null;
1033
    private ScannerCallable callable = null;
1032
    private final LinkedList<Result> cache = new LinkedList<Result>();
1034
    private final LinkedList<Result> cache = new LinkedList<Result>();
1033
    private final int caching;
1035
    private final int caching;
1034
    private long lastNext;
1036
    private long lastNext;
1035
    // Keep lastResult returned successfully in case we have to reset scanner.
1037
    // Keep lastResult returned successfully in case we have to reset scanner.
1036
    private Result lastResult = null;
1038
    private Result lastResult = null;

    
   
1039
    private ScanMetrics scanMetrics = null;
1037

    
   
1040

   
1038
    protected ClientScanner(final Scan scan) {
1041
    protected ClientScanner(final Scan scan) {
1039
      if (CLIENT_LOG.isDebugEnabled()) {
1042
      if (CLIENT_LOG.isDebugEnabled()) {
1040
        CLIENT_LOG.debug("Creating scanner over "
1043
        CLIENT_LOG.debug("Creating scanner over "
1041
            + Bytes.toString(getTableName())
1044
            + Bytes.toString(getTableName())
1042
            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
1045
            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
1043
      }
1046
      }
1044
      this.scan = scan;
1047
      this.scan = scan;
1045
      this.lastNext = System.currentTimeMillis();
1048
      this.lastNext = System.currentTimeMillis();
1046

    
   
1049

   

    
   
1050
      // check if application wants to collect scan metrics

    
   
1051
      byte[] enableMetrics = scan.getAttribute(

    
   
1052
        Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);

    
   
1053
      if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {

    
   
1054
        scanMetrics = new ScanMetrics();

    
   
1055
      }

    
   
1056

   
1047
      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
1057
      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
1048
      if (this.scan.getCaching() > 0) {
1058
      if (this.scan.getCaching() > 0) {
1049
        this.caching = this.scan.getCaching();
1059
        this.caching = this.scan.getCaching();
1050
      } else {
1060
      } else {
1051
        this.caching = HTable.this.scannerCaching;
1061
        this.caching = HTable.this.scannerCaching;
1052
      }
1062
      }
1053

    
   
1063

   
1054
      // Removed filter validation.  We have a new format now, only one of all
1064
      // Removed filter validation.  We have a new format now, only one of all
1055
      // the current filters has a validate() method.  We can add it back,
1065
      // the current filters has a validate() method.  We can add it back,
1056
      // need to decide on what we're going to do re: filter redesign.
1066
      // need to decide on what we're going to do re: filter redesign.
1057
      // Need, at the least, to break up family from qualifier as separate
1067
      // Need, at the least, to break up family from qualifier as separate
1058
      // checks, I think it's important server-side filters are optimal in that
1068
      // checks, I think it's important server-side filters are optimal in that
1059
      // respect.
1069
      // respect.
1060
    }
1070
    }
1061

    
   
1071

   
1062
    public void initialize() throws IOException {
1072
    public void initialize() throws IOException {
1063
      nextScanner(this.caching, false);
1073
      nextScanner(this.caching, false);
1064
    }
1074
    }
1065

    
   
1075

   
1066
    protected Scan getScan() {
1076
    protected Scan getScan() {
1067
      return scan;
1077
      return scan;
1068
    }
1078
    }
1069

    
   
1079

   
1070
    protected long getTimestamp() {
1080
    protected long getTimestamp() {
1071
      return lastNext;
1081
      return lastNext;
1072
    }
1082
    }
1073

    
   
1083

   
1074
    // returns true if the passed region endKey
1084
    // returns true if the passed region endKey
1075
    private boolean checkScanStopRow(final byte [] endKey) {
1085
    private boolean checkScanStopRow(final byte [] endKey) {
1076
      if (this.scan.getStopRow().length > 0) {
1086
      if (this.scan.getStopRow().length > 0) {
1077
        // there is a stop row, check to see if we are past it.
1087
        // there is a stop row, check to see if we are past it.
1078
        byte [] stopRow = scan.getStopRow();
1088
        byte [] stopRow = scan.getStopRow();
1079
        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
1089
        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
1080
          endKey, 0, endKey.length);
1090
          endKey, 0, endKey.length);
1081
        if (cmp <= 0) {
1091
        if (cmp <= 0) {
1082
          // stopRow <= endKey (endKey is equals to or larger than stopRow)
1092
          // stopRow <= endKey (endKey is equals to or larger than stopRow)
1083
          // This is a stop.
1093
          // This is a stop.
1084
          return true;
1094
          return true;
1085
        }
1095
        }
1086
      }
1096
      }
1087
      return false; //unlikely.
1097
      return false; //unlikely.
1088
    }
1098
    }
1089

    
   
1099

   
1090
    /*
1100
    /*
1091
     * Gets a scanner for the next region.  If this.currentRegion != null, then
1101
     * Gets a scanner for the next region.  If this.currentRegion != null, then
1092
     * we will move to the endrow of this.currentRegion.  Else we will get
1102
     * we will move to the endrow of this.currentRegion.  Else we will get
1093
     * scanner at the scan.getStartRow().  We will go no further, just tidy
1103
     * scanner at the scan.getStartRow().  We will go no further, just tidy
1094
     * up outstanding scanners, if <code>currentRegion != null</code> and
1104
     * up outstanding scanners, if <code>currentRegion != null</code> and
1095
     * <code>done</code> is true.
1105
     * <code>done</code> is true.
1096
     * @param nbRows
1106
     * @param nbRows
1097
     * @param done Server-side says we're done scanning.
1107
     * @param done Server-side says we're done scanning.
1098
     */
1108
     */
1099
    private boolean nextScanner(int nbRows, final boolean done)
1109
    private boolean nextScanner(int nbRows, final boolean done)
1100
    throws IOException {
1110
    throws IOException {
1101
      // Close the previous scanner if it's open
1111
      // Close the previous scanner if it's open
1102
      if (this.callable != null) {
1112
      if (this.callable != null) {
1103
        this.callable.setClose();
1113
        this.callable.setClose();
1104
        getConnection().getRegionServerWithRetries(callable);
1114
        getConnection().getRegionServerWithRetries(callable);
1105
        this.callable = null;
1115
        this.callable = null;
1106
      }
1116
      }
1107

    
   
1117

   
1108
      // Where to start the next scanner
1118
      // Where to start the next scanner
1109
      byte [] localStartKey;
1119
      byte [] localStartKey;
1110

    
   
1120

   
1111
      // if we're at end of table, close and return false to stop iterating
1121
      // if we're at end of table, close and return false to stop iterating
1112
      if (this.currentRegion != null) {
1122
      if (this.currentRegion != null) {
1113
        byte [] endKey = this.currentRegion.getEndKey();
1123
        byte [] endKey = this.currentRegion.getEndKey();
1114
        if (endKey == null ||
1124
        if (endKey == null ||
1115
            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
1125
            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
1116
            checkScanStopRow(endKey) ||
1126
            checkScanStopRow(endKey) ||
1117
            done) {
1127
            done) {
1118
          close();
1128
          close();
1119
          if (CLIENT_LOG.isDebugEnabled()) {
1129
          if (CLIENT_LOG.isDebugEnabled()) {
1120
            CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion);
1130
            CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion);
1121
          }
1131
          }
1122
          return false;
1132
          return false;
1123
        }
1133
        }
1124
        localStartKey = endKey;
1134
        localStartKey = endKey;
1125
        if (CLIENT_LOG.isDebugEnabled()) {
1135
        if (CLIENT_LOG.isDebugEnabled()) {
1126
          CLIENT_LOG.debug("Finished with region " + this.currentRegion);
1136
          CLIENT_LOG.debug("Finished with region " + this.currentRegion);
1127
        }
1137
        }
1128
      } else {
1138
      } else {
1129
        localStartKey = this.scan.getStartRow();
1139
        localStartKey = this.scan.getStartRow();
1130
      }
1140
      }
1131

    
   
1141

   
1132
      if (CLIENT_LOG.isDebugEnabled()) {
1142
      if (CLIENT_LOG.isDebugEnabled()) {
1133
        CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
1143
        CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
1134
          Bytes.toStringBinary(localStartKey) + "'");
1144
          Bytes.toStringBinary(localStartKey) + "'");
1135
      }
1145
      }
1136
      try {
1146
      try {
1137
        callable = getScannerCallable(localStartKey, nbRows);
1147
        callable = getScannerCallable(localStartKey, nbRows);
1138
        // Open a scanner on the region server starting at the
1148
        // Open a scanner on the region server starting at the
1139
        // beginning of the region
1149
        // beginning of the region
1140
        getConnection().getRegionServerWithRetries(callable);
1150
        getConnection().getRegionServerWithRetries(callable);
1141
        this.currentRegion = callable.getHRegionInfo();
1151
        this.currentRegion = callable.getHRegionInfo();

    
   
1152
        if (this.scanMetrics != null) {

    
   
1153
          this.scanMetrics.countOfRegions.inc();

    
   
1154
        }
1142
      } catch (IOException e) {
1155
      } catch (IOException e) {
1143
        close();
1156
        close();
1144
        throw e;
1157
        throw e;
1145
      }
1158
      }
1146
      return true;
1159
      return true;
1147
    }
1160
    }
1148

    
   
1161

   
1149
    protected ScannerCallable getScannerCallable(byte [] localStartKey,
1162
    protected ScannerCallable getScannerCallable(byte [] localStartKey,
1150
        int nbRows) {
1163
        int nbRows) {
1151
      scan.setStartRow(localStartKey);
1164
      scan.setStartRow(localStartKey);
1152
      ScannerCallable s = new ScannerCallable(getConnection(),
1165
      ScannerCallable s = new ScannerCallable(getConnection(),
1153
        getTableName(), scan);
1166
        getTableName(), scan, this.scanMetrics);
1154
      s.setCaching(nbRows);
1167
      s.setCaching(nbRows);
1155
      return s;
1168
      return s;
1156
    }
1169
    }
1157

    
   
1170

   

    
   
1171
    /**

    
   
1172
     * publish the scan metrics

    
   
1173
     * For now, we use scan.setAttribute to pass the metrics for application

    
   
1174
     * or TableInputFormat to consume

    
   
1175
     * Later, we could push it to other systems

    
   
1176
     * We don't use metrics framework because it doesn't support

    
   
1177
     * multi instances of the same metrics on the same machine; for scan/map

    
   
1178
     * reduce scenarios, we will have multiple scans running at the same time

    
   
1179
     */

    
   
1180
    private void writeScanMetrics() throws IOException

    
   
1181
    {

    
   
1182
      // by default, scanMetrics is null

    
   
1183
      // if application wants to collect scanMetrics, it can turn it on by

    
   
1184
      // calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE,

    
   
1185
      // Bytes.toBytes(Boolean.TRUE))

    
   
1186
      if (this.scanMetrics == null) {

    
   
1187
        return;

    
   
1188
      }

    
   
1189
      final DataOutputBuffer d = new DataOutputBuffer();

    
   
1190
      scanMetrics.write(d);

    
   
1191
      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());

    
   
1192
    }

    
   
1193

   
1158
    public Result next() throws IOException {
1194
    public Result next() throws IOException {
1159
      // If the scanner is closed but there is some rows left in the cache,
1195
      // If the scanner is closed but there is some rows left in the cache,
1160
      // it will first empty it before returning null
1196
      // it will first empty it before returning null
1161
      if (cache.size() == 0 && this.closed) {
1197
      if (cache.size() == 0 && this.closed) {

    
   
1198
        writeScanMetrics();
1162
        return null;
1199
        return null;
1163
      }
1200
      }
1164
      if (cache.size() == 0) {
1201
      if (cache.size() == 0) {
1165
        Result [] values = null;
1202
        Result [] values = null;
1166
        long remainingResultSize = maxScannerResultSize;
1203
        long remainingResultSize = maxScannerResultSize;
1167
        int countdown = this.caching;
1204
        int countdown = this.caching;
1168
        // We need to reset it if it's a new callable that was created
1205
        // We need to reset it if it's a new callable that was created
1169
        // with a countdown in nextScanner
1206
        // with a countdown in nextScanner
1170
        callable.setCaching(this.caching);
1207
        callable.setCaching(this.caching);
1171
        // This flag is set when we want to skip the result returned.  We do
1208
        // This flag is set when we want to skip the result returned.  We do
1172
        // this when we reset scanner because it split under us.
1209
        // this when we reset scanner because it split under us.
1173
        boolean skipFirst = false;
1210
        boolean skipFirst = false;
1174
        do {
1211
        do {
1175
          try {
1212
          try {
1176
            if (skipFirst) {
1213
            if (skipFirst) {
1177
              // Skip only the first row (which was the last row of the last
1214
              // Skip only the first row (which was the last row of the last
1178
              // already-processed batch).
1215
              // already-processed batch).
1179
              callable.setCaching(1);
1216
              callable.setCaching(1);
1180
              values = getConnection().getRegionServerWithRetries(callable);
1217
              values = getConnection().getRegionServerWithRetries(callable);
1181
              callable.setCaching(this.caching);
1218
              callable.setCaching(this.caching);
1182
              skipFirst = false;
1219
              skipFirst = false;
1183
            }
1220
            }
1184
            // Server returns a null values if scanning is to stop.  Else,
1221
            // Server returns a null values if scanning is to stop.  Else,
1185
            // returns an empty array if scanning is to go on and we've just
1222
            // returns an empty array if scanning is to go on and we've just
1186
            // exhausted current region.
1223
            // exhausted current region.
1187
            values = getConnection().getRegionServerWithRetries(callable);
1224
            values = getConnection().getRegionServerWithRetries(callable);
1188
          } catch (DoNotRetryIOException e) {
1225
          } catch (DoNotRetryIOException e) {
1189
            if (e instanceof UnknownScannerException) {
1226
            if (e instanceof UnknownScannerException) {
1190
              long timeout = lastNext + scannerTimeout;
1227
              long timeout = lastNext + scannerTimeout;
1191
              // If we are over the timeout, throw this exception to the client
1228
              // If we are over the timeout, throw this exception to the client
1192
              // Else, it's because the region moved and we used the old id
1229
              // Else, it's because the region moved and we used the old id
1193
              // against the new region server; reset the scanner.
1230
              // against the new region server; reset the scanner.
1194
              if (timeout < System.currentTimeMillis()) {
1231
              if (timeout < System.currentTimeMillis()) {
1195
                long elapsed = System.currentTimeMillis() - lastNext;
1232
                long elapsed = System.currentTimeMillis() - lastNext;
1196
                ScannerTimeoutException ex = new ScannerTimeoutException(
1233
                ScannerTimeoutException ex = new ScannerTimeoutException(
1197
                    elapsed + "ms passed since the last invocation, " +
1234
                    elapsed + "ms passed since the last invocation, " +
1198
                        "timeout is currently set to " + scannerTimeout);
1235
                        "timeout is currently set to " + scannerTimeout);
1199
                ex.initCause(e);
1236
                ex.initCause(e);
1200
                throw ex;
1237
                throw ex;
1201
              }
1238
              }
1202
            } else {
1239
            } else {
1203
              Throwable cause = e.getCause();
1240
              Throwable cause = e.getCause();
1204
              if (cause == null || !(cause instanceof NotServingRegionException)) {
1241
              if (cause == null || !(cause instanceof NotServingRegionException)) {
1205
                throw e;
1242
                throw e;
1206
              }
1243
              }
1207
            }
1244
            }
1208
            // Else, its signal from depths of ScannerCallable that we got an
1245
            // Else, its signal from depths of ScannerCallable that we got an
1209
            // NSRE on a next and that we need to reset the scanner.
1246
            // NSRE on a next and that we need to reset the scanner.
1210
            if (this.lastResult != null) {
1247
            if (this.lastResult != null) {
1211
              this.scan.setStartRow(this.lastResult.getRow());
1248
              this.scan.setStartRow(this.lastResult.getRow());
1212
              // Skip first row returned.  We already let it out on previous
1249
              // Skip first row returned.  We already let it out on previous
1213
              // invocation.
1250
              // invocation.
1214
              skipFirst = true;
1251
              skipFirst = true;
1215
            }
1252
            }
1216
            // Clear region
1253
            // Clear region
1217
            this.currentRegion = null;
1254
            this.currentRegion = null;
1218
            continue;
1255
            continue;
1219
          }
1256
          }
1220
          lastNext = System.currentTimeMillis();
1257
          long currentTime = System.currentTimeMillis();

    
   
1258
          if (this.scanMetrics != null ) {

    
   
1259
            this.scanMetrics.sumOfMillisSecBetweenNexts.inc(

    
   
1260
              currentTime-lastNext);

    
   
1261
          }

    
   
1262
          lastNext = currentTime;
1221
          if (values != null && values.length > 0) {
1263
          if (values != null && values.length > 0) {
1222
            for (Result rs : values) {
1264
            for (Result rs : values) {
1223
              cache.add(rs);
1265
              cache.add(rs);
1224
              for (KeyValue kv : rs.raw()) {
1266
              for (KeyValue kv : rs.raw()) {
1225
                  remainingResultSize -= kv.heapSize();
1267
                  remainingResultSize -= kv.heapSize();
1226
              }
1268
              }
1227
              countdown--;
1269
              countdown--;
1228
              this.lastResult = rs;
1270
              this.lastResult = rs;
1229
            }
1271
            }
1230
          }
1272
          }
1231
          // Values == null means server-side filter has determined we must STOP
1273
          // Values == null means server-side filter has determined we must STOP
1232
        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
1274
        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
1233
      }
1275
      }
1234

    
   
1276

   
1235
      if (cache.size() > 0) {
1277
      if (cache.size() > 0) {
1236
        return cache.poll();
1278
        return cache.poll();
1237
      }
1279
      }

    
   
1280
      writeScanMetrics();
1238
      return null;
1281
      return null;
1239
    }
1282
    }
1240

    
   
1283

   
1241
    /**
1284
    /**
1242
     * Get <param>nbRows</param> rows.
1285
     * Get <param>nbRows</param> rows.
1243
     * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
1286
     * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
1244
     * setting (or hbase.client.scanner.caching in hbase-site.xml).
1287
     * setting (or hbase.client.scanner.caching in hbase-site.xml).
1245
     * @param nbRows number of rows to return
1288
     * @param nbRows number of rows to return
1246
     * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
1289
     * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
1247
     * if returned array is of zero-length (We never return null).
1290
     * if returned array is of zero-length (We never return null).
1248
     * @throws IOException
1291
     * @throws IOException
1249
     */
1292
     */
1250
    public Result [] next(int nbRows) throws IOException {
1293
    public Result [] next(int nbRows) throws IOException {
1251
      // Collect values to be returned here
1294
      // Collect values to be returned here
1252
      ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
1295
      ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
1253
      for(int i = 0; i < nbRows; i++) {
1296
      for(int i = 0; i < nbRows; i++) {
1254
        Result next = next();
1297
        Result next = next();
1255
        if (next != null) {
1298
        if (next != null) {
1256
          resultSets.add(next);
1299
          resultSets.add(next);
1257
        } else {
1300
        } else {
1258
          break;
1301
          break;
1259
        }
1302
        }
1260
      }
1303
      }
1261
      return resultSets.toArray(new Result[resultSets.size()]);
1304
      return resultSets.toArray(new Result[resultSets.size()]);
1262
    }
1305
    }
1263

    
   
1306

   
1264
    public void close() {
1307
    public void close() {
1265
      if (callable != null) {
1308
      if (callable != null) {
1266
        callable.setClose();
1309
        callable.setClose();
1267
        try {
1310
        try {
1268
          getConnection().getRegionServerWithRetries(callable);
1311
          getConnection().getRegionServerWithRetries(callable);
1269
        } catch (IOException e) {
1312
        } catch (IOException e) {
1270
          // We used to catch this error, interpret, and rethrow. However, we
1313
          // We used to catch this error, interpret, and rethrow. However, we
1271
          // have since decided that it's not nice for a scanner's close to
1314
          // have since decided that it's not nice for a scanner's close to
1272
          // throw exceptions. Chances are it was just an UnknownScanner
1315
          // throw exceptions. Chances are it was just an UnknownScanner
1273
          // exception due to lease time out.
1316
          // exception due to lease time out.
1274
        }
1317
        }
1275
        callable = null;
1318
        callable = null;
1276
      }
1319
      }
1277
      closed = true;
1320
      closed = true;
1278
    }
1321
    }
1279

    
   
1322

   
1280
    public Iterator<Result> iterator() {
1323
    public Iterator<Result> iterator() {
1281
      return new Iterator<Result>() {
1324
      return new Iterator<Result>() {
1282
        // The next RowResult, possibly pre-read
1325
        // The next RowResult, possibly pre-read
1283
        Result next = null;
1326
        Result next = null;
1284

    
   
1327

   
1285
        // return true if there is another item pending, false if there isn't.
1328
        // return true if there is another item pending, false if there isn't.
1286
        // this method is where the actual advancing takes place, but you need
1329
        // this method is where the actual advancing takes place, but you need
1287
        // to call next() to consume it. hasNext() will only advance if there
1330
        // to call next() to consume it. hasNext() will only advance if there
1288
        // isn't a pending next().
1331
        // isn't a pending next().
1289
        public boolean hasNext() {
1332
        public boolean hasNext() {
1290
          if (next == null) {
1333
          if (next == null) {
1291
            try {
1334
            try {
1292
              next = ClientScanner.this.next();
1335
              next = ClientScanner.this.next();
1293
              return next != null;
1336
              return next != null;
1294
            } catch (IOException e) {
1337
            } catch (IOException e) {
1295
              throw new RuntimeException(e);
1338
              throw new RuntimeException(e);
1296
            }
1339
            }
1297
          }
1340
          }
1298
          return true;
1341
          return true;
1299
        }
1342
        }
1300

    
   
1343

   
1301
        // get the pending next item and advance the iterator. returns null if
1344
        // get the pending next item and advance the iterator. returns null if
1302
        // there is no next item.
1345
        // there is no next item.
1303
        public Result next() {
1346
        public Result next() {
1304
          // since hasNext() does the real advancing, we call this to determine
1347
          // since hasNext() does the real advancing, we call this to determine
1305
          // if there is a next before proceeding.
1348
          // if there is a next before proceeding.
1306
          if (!hasNext()) {
1349
          if (!hasNext()) {
1307
            return null;
1350
            return null;
1308
          }
1351
          }
1309

    
   
1352

   
1310
          // if we get to here, then hasNext() has given us an item to return.
1353
          // if we get to here, then hasNext() has given us an item to return.
1311
          // we want to return the item and then null out the next pointer, so
1354
          // we want to return the item and then null out the next pointer, so
1312
          // we use a temporary variable.
1355
          // we use a temporary variable.
1313
          Result temp = next;
1356
          Result temp = next;
1314
          next = null;
1357
          next = null;
1315
          return temp;
1358
          return temp;
1316
        }
1359
        }
1317

    
   
1360

   
1318
        public void remove() {
1361
        public void remove() {
1319
          throw new UnsupportedOperationException();
1362
          throw new UnsupportedOperationException();
1320
        }
1363
        }
1321
      };
1364
      };
1322
    }
1365
    }
1323
  }
1366
  }
1324

    
   
1367

   
1325
  /**
1368
  /**
1326
   * The pool is used for mutli requests for this HTable
1369
   * The pool is used for mutli requests for this HTable
1327
   * @return the pool used for mutli
1370
   * @return the pool used for mutli
1328
   */
1371
   */
1329
  ExecutorService getPool() {
1372
  ExecutorService getPool() {
1330
    return this.pool;
1373
    return this.pool;
1331
  }
1374
  }
1332

    
   
1375

   
1333
  static class DaemonThreadFactory implements ThreadFactory {
1376
  static class DaemonThreadFactory implements ThreadFactory {
1334
    static final AtomicInteger poolNumber = new AtomicInteger(1);
1377
    static final AtomicInteger poolNumber = new AtomicInteger(1);
1335
        final ThreadGroup group;
1378
        final ThreadGroup group;
1336
        final AtomicInteger threadNumber = new AtomicInteger(1);
1379
        final AtomicInteger threadNumber = new AtomicInteger(1);
1337
        final String namePrefix;
1380
        final String namePrefix;
1338

    
   
1381

   
1339
        DaemonThreadFactory() {
1382
        DaemonThreadFactory() {
1340
            SecurityManager s = System.getSecurityManager();
1383
            SecurityManager s = System.getSecurityManager();
1341
            group = (s != null)? s.getThreadGroup() :
1384
            group = (s != null)? s.getThreadGroup() :
1342
                                 Thread.currentThread().getThreadGroup();
1385
                                 Thread.currentThread().getThreadGroup();
1343
            namePrefix = "pool-" +
1386
            namePrefix = "pool-" +
1344
                          poolNumber.getAndIncrement() +
1387
                          poolNumber.getAndIncrement() +
1345
                         "-thread-";
1388
                         "-thread-";
1346
        }
1389
        }
1347

    
   
1390

   
1348
        public Thread newThread(Runnable r) {
1391
        public Thread newThread(Runnable r) {
1349
            Thread t = new Thread(group, r,
1392
            Thread t = new Thread(group, r,
1350
                                  namePrefix + threadNumber.getAndIncrement(),
1393
                                  namePrefix + threadNumber.getAndIncrement(),
1351
                                  0);
1394
                                  0);
1352
            if (!t.isDaemon()) {
1395
            if (!t.isDaemon()) {
1353
              t.setDaemon(true);
1396
              t.setDaemon(true);
1354
            }
1397
            }
1355
            if (t.getPriority() != Thread.NORM_PRIORITY) {
1398
            if (t.getPriority() != Thread.NORM_PRIORITY) {
1356
              t.setPriority(Thread.NORM_PRIORITY);
1399
              t.setPriority(Thread.NORM_PRIORITY);
1357
            }
1400
            }
1358
            return t;
1401
            return t;
1359
        }
1402
        }
1360
  }
1403
  }
1361

    
   
1404

   
1362
  /**
1405
  /**
1363
   * Enable or disable region cache prefetch for the table. It will be
1406
   * Enable or disable region cache prefetch for the table. It will be
1364
   * applied for the given table's all HTable instances who share the same
1407
   * applied for the given table's all HTable instances who share the same
1365
   * connection. By default, the cache prefetch is enabled.
1408
   * connection. By default, the cache prefetch is enabled.
1366
   * @param tableName name of table to configure.
1409
   * @param tableName name of table to configure.
1367
   * @param enable Set to true to enable region cache prefetch. Or set to
1410
   * @param enable Set to true to enable region cache prefetch. Or set to
1368
   * false to disable it.
1411
   * false to disable it.
1369
   * @throws IOException
1412
   * @throws IOException
1370
   */
1413
   */
1371
  public static void setRegionCachePrefetch(final byte[] tableName,
1414
  public static void setRegionCachePrefetch(final byte[] tableName,
1372
      final boolean enable) throws IOException {
1415
      final boolean enable) throws IOException {
1373
    HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
1416
    HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
1374
        .create()) {
1417
        .create()) {
1375
      @Override
1418
      @Override
1376
      public Void connect(HConnection connection) throws IOException {
1419
      public Void connect(HConnection connection) throws IOException {
1377
        connection.setRegionCachePrefetch(tableName, enable);
1420
        connection.setRegionCachePrefetch(tableName, enable);
1378
        return null;
1421
        return null;
1379
      }
1422
      }
1380
    });
1423
    });
1381
  }
1424
  }
1382

    
   
1425

   
1383
  /**
1426
  /**
1384
   * Enable or disable region cache prefetch for the table. It will be
1427
   * Enable or disable region cache prefetch for the table. It will be
1385
   * applied for the given table's all HTable instances who share the same
1428
   * applied for the given table's all HTable instances who share the same
1386
   * connection. By default, the cache prefetch is enabled.
1429
   * connection. By default, the cache prefetch is enabled.
1387
   * @param conf The Configuration object to use.
1430
   * @param conf The Configuration object to use.
1388
   * @param tableName name of table to configure.
1431
   * @param tableName name of table to configure.
1389
   * @param enable Set to true to enable region cache prefetch. Or set to
1432
   * @param enable Set to true to enable region cache prefetch. Or set to
1390
   * false to disable it.
1433
   * false to disable it.
1391
   * @throws IOException
1434
   * @throws IOException
1392
   */
1435
   */
1393
  public static void setRegionCachePrefetch(final Configuration conf,
1436
  public static void setRegionCachePrefetch(final Configuration conf,
1394
      final byte[] tableName, final boolean enable) throws IOException {
1437
      final byte[] tableName, final boolean enable) throws IOException {
1395
    HConnectionManager.execute(new HConnectable<Void>(conf) {
1438
    HConnectionManager.execute(new HConnectable<Void>(conf) {
1396
      @Override
1439
      @Override
1397
      public Void connect(HConnection connection) throws IOException {
1440
      public Void connect(HConnection connection) throws IOException {
1398
        connection.setRegionCachePrefetch(tableName, enable);
1441
        connection.setRegionCachePrefetch(tableName, enable);
1399
        return null;
1442
        return null;
1400
      }
1443
      }
1401
    });
1444
    });
1402
  }
1445
  }
1403

    
   
1446

   
1404
  /**
1447
  /**
1405
   * Check whether region cache prefetch is enabled or not for the table.
1448
   * Check whether region cache prefetch is enabled or not for the table.
1406
   * @param conf The Configuration object to use.
1449
   * @param conf The Configuration object to use.
1407
   * @param tableName name of table to check
1450
   * @param tableName name of table to check
1408
   * @return true if table's region cache prefecth is enabled. Otherwise
1451
   * @return true if table's region cache prefecth is enabled. Otherwise
1409
   * it is disabled.
1452
   * it is disabled.
1410
   * @throws IOException
1453
   * @throws IOException
1411
   */
1454
   */
1412
  public static boolean getRegionCachePrefetch(final Configuration conf,
1455
  public static boolean getRegionCachePrefetch(final Configuration conf,
1413
      final byte[] tableName) throws IOException {
1456
      final byte[] tableName) throws IOException {
1414
    return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1457
    return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1415
      @Override
1458
      @Override
1416
      public Boolean connect(HConnection connection) throws IOException {
1459
      public Boolean connect(HConnection connection) throws IOException {
1417
        return connection.getRegionCachePrefetch(tableName);
1460
        return connection.getRegionCachePrefetch(tableName);
1418
      }
1461
      }
1419
    });
1462
    });
1420
  }
1463
  }
1421

    
   
1464

   
1422
  /**
1465
  /**
1423
   * Check whether region cache prefetch is enabled or not for the table.
1466
   * Check whether region cache prefetch is enabled or not for the table.
1424
   * @param tableName name of table to check
1467
   * @param tableName name of table to check
1425
   * @return true if table's region cache prefecth is enabled. Otherwise
1468
   * @return true if table's region cache prefecth is enabled. Otherwise
1426
   * it is disabled.
1469
   * it is disabled.
1427
   * @throws IOException
1470
   * @throws IOException
1428
   */
1471
   */
1429
  public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1472
  public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1430
    return HConnectionManager.execute(new HConnectable<Boolean>(
1473
    return HConnectionManager.execute(new HConnectable<Boolean>(
1431
        HBaseConfiguration.create()) {
1474
        HBaseConfiguration.create()) {
1432
      @Override
1475
      @Override
1433
      public Boolean connect(HConnection connection) throws IOException {
1476
      public Boolean connect(HConnection connection) throws IOException {
1434
        return connection.getRegionCachePrefetch(tableName);
1477
        return connection.getRegionCachePrefetch(tableName);
1435
      }
1478
      }
1436
    });
1479
    });
1437
 }
1480
 }
1438

    
   
1481

   
1439
  /**
1482
  /**
1440
   * Explicitly clears the region cache to fetch the latest value from META.
1483
   * Explicitly clears the region cache to fetch the latest value from META.
1441
   * This is a power user function: avoid unless you know the ramifications.
1484
   * This is a power user function: avoid unless you know the ramifications.
1442
   */
1485
   */
1443
  public void clearRegionCache() {
1486
  public void clearRegionCache() {
1444
    this.connection.clearRegionCache();
1487
    this.connection.clearRegionCache();
1445
  }
1488
  }
1446

    
   
1489

   
1447
  /**
1490
  /**
1448
   * {@inheritDoc}
1491
   * {@inheritDoc}
1449
   */
1492
   */
1450
  @Override
1493
  @Override
1451
  public <T extends CoprocessorProtocol> T coprocessorProxy(
1494
  public <T extends CoprocessorProtocol> T coprocessorProxy(
1452
      Class<T> protocol, byte[] row) {
1495
      Class<T> protocol, byte[] row) {
1453
    return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
1496
    return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
1454
        new Class[]{protocol},
1497
        new Class[]{protocol},
1455
        new ExecRPCInvoker(configuration,
1498
        new ExecRPCInvoker(configuration,
1456
            connection,
1499
            connection,
1457
            protocol,
1500
            protocol,
1458
            tableName,
1501
            tableName,
1459
            row));
1502
            row));
1460
  }
1503
  }
1461

    
   
1504

   
1462
  /**
1505
  /**
1463
   * {@inheritDoc}
1506
   * {@inheritDoc}
1464
   */
1507
   */
1465
  @Override
1508
  @Override
1466
  public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
1509
  public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
1467
      Class<T> protocol, byte[] startKey, byte[] endKey,
1510
      Class<T> protocol, byte[] startKey, byte[] endKey,
1468
      Batch.Call<T,R> callable)
1511
      Batch.Call<T,R> callable)
1469
      throws IOException, Throwable {
1512
      throws IOException, Throwable {
1470

    
   
1513

   
1471
    final Map<byte[],R> results = new TreeMap<byte[],R>(
1514
    final Map<byte[],R> results = new TreeMap<byte[],R>(
1472
        Bytes.BYTES_COMPARATOR);
1515
        Bytes.BYTES_COMPARATOR);
1473
    coprocessorExec(protocol, startKey, endKey, callable,
1516
    coprocessorExec(protocol, startKey, endKey, callable,
1474
        new Batch.Callback<R>(){
1517
        new Batch.Callback<R>(){
1475
      public void update(byte[] region, byte[] row, R value) {
1518
      public void update(byte[] region, byte[] row, R value) {
1476
        results.put(region, value);
1519
        results.put(region, value);
1477
      }
1520
      }
1478
    });
1521
    });
1479
    return results;
1522
    return results;
1480
  }
1523
  }
1481

    
   
1524

   
1482
  /**
1525
  /**
1483
   * {@inheritDoc}
1526
   * {@inheritDoc}
1484
   */
1527
   */
1485
  @Override
1528
  @Override
1486
  public <T extends CoprocessorProtocol, R> void coprocessorExec(
1529
  public <T extends CoprocessorProtocol, R> void coprocessorExec(
1487
      Class<T> protocol, byte[] startKey, byte[] endKey,
1530
      Class<T> protocol, byte[] startKey, byte[] endKey,
1488
      Batch.Call<T,R> callable, Batch.Callback<R> callback)
1531
      Batch.Call<T,R> callable, Batch.Callback<R> callback)
1489
      throws IOException, Throwable {
1532
      throws IOException, Throwable {
1490

    
   
1533

   
1491
    // get regions covered by the row range
1534
    // get regions covered by the row range
1492
    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1535
    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1493
    connection.processExecs(protocol, keys, tableName, pool, callable,
1536
    connection.processExecs(protocol, keys, tableName, pool, callable,
1494
        callback);
1537
        callback);
1495
  }
1538
  }
1496

    
   
1539

   
1497
  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1540
  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1498
  throws IOException {
1541
  throws IOException {
1499
    Pair<byte[][],byte[][]> startEndKeys = getStartEndKeys();
1542
    Pair<byte[][],byte[][]> startEndKeys = getStartEndKeys();
1500
    byte[][] startKeys = startEndKeys.getFirst();
1543
    byte[][] startKeys = startEndKeys.getFirst();
1501
    byte[][] endKeys = startEndKeys.getSecond();
1544
    byte[][] endKeys = startEndKeys.getSecond();
1502

    
   
1545

   
1503
    if (start == null) {
1546
    if (start == null) {
1504
      start = HConstants.EMPTY_START_ROW;
1547
      start = HConstants.EMPTY_START_ROW;
1505
    }
1548
    }
1506
    if (end == null) {
1549
    if (end == null) {
1507
      end = HConstants.EMPTY_END_ROW;
1550
      end = HConstants.EMPTY_END_ROW;
1508
    }
1551
    }
1509

    
   
1552

   
1510
    List<byte[]> rangeKeys = new ArrayList<byte[]>();
1553
    List<byte[]> rangeKeys = new ArrayList<byte[]>();
1511
    for (int i=0; i<startKeys.length; i++) {
1554
    for (int i=0; i<startKeys.length; i++) {
1512
      if (Bytes.compareTo(start, startKeys[i]) >= 0 ) {
1555
      if (Bytes.compareTo(start, startKeys[i]) >= 0 ) {
1513
        if (Bytes.equals(endKeys[i], HConstants.EMPTY_END_ROW) ||
1556
        if (Bytes.equals(endKeys[i], HConstants.EMPTY_END_ROW) ||
1514
            Bytes.compareTo(start, endKeys[i]) < 0) {
1557
            Bytes.compareTo(start, endKeys[i]) < 0) {
1515
          rangeKeys.add(start);
1558
          rangeKeys.add(start);
1516
        }
1559
        }
1517
      } else if (Bytes.equals(end, HConstants.EMPTY_END_ROW) ||
1560
      } else if (Bytes.equals(end, HConstants.EMPTY_END_ROW) ||
1518
          Bytes.compareTo(startKeys[i], end) <= 0) {
1561
          Bytes.compareTo(startKeys[i], end) <= 0) {
1519
        rangeKeys.add(startKeys[i]);
1562
        rangeKeys.add(startKeys[i]);
1520
      } else {
1563
      } else {
1521
        break; // past stop
1564
        break; // past stop
1522
      }
1565
      }
1523
    }
1566
    }
1524

    
   
1567

   
1525
    return rangeKeys;
1568
    return rangeKeys;
1526
  }
1569
  }
1527

    
   
1570

   
1528
  public void setOperationTimeout(int operationTimeout) {
1571
  public void setOperationTimeout(int operationTimeout) {
1529
    this.operationTimeout = operationTimeout;
1572
    this.operationTimeout = operationTimeout;
1530
  }
1573
  }
1531

    
   
1574

   
1532
  public int getOperationTimeout() {
1575
  public int getOperationTimeout() {
1533
    return operationTimeout;
1576
    return operationTimeout;
1534
  }
1577
  }
1535

    
   
1578

   
1536
}
1579
}
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
New File
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Revision 1176942 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
Revision 1176942 New Change
 
  1. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java: Loading...
  2. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java: Loading...
  3. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java: Loading...
  4. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java: Loading...
  5. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java: Loading...
  6. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java: Loading...
  7. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java: Loading...
  8. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java: Loading...
  9. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java: Loading...
  10. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java: Loading...