Review Board 1.7.22


Add support for pulling HBase columns with prefixes

Review Request #9276 - Created Feb. 3, 2013 and updated

Swarnim Kulkarni
trunk
HIVE-3725
Reviewers
hive
hive-git
Added support for pulling hbase columns just by providing prefixes and a wildcard. So a query now could look something like this:

CREATE EXTERNAL TABLE hive_hbase_test
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe' 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,fam1:col*") 
TBLPROPERTIES ("hbase.table.name" = "TEST_HBASE_TABLE");

This would pull in all columns under column family "fam1" which start with "col". This gives a little more flexibility over pull all columns format.
Added unit tests to demonstrate the new functionality. Also made sure that all existing unit tests passed.
hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
Revision 65c81bf New Change
[20] 30 lines
[+20]
31
import org.apache.hadoop.hbase.client.Result;
31
import org.apache.hadoop.hbase.client.Result;
32
import org.apache.hadoop.hbase.util.Bytes;
32
import org.apache.hadoop.hbase.util.Bytes;
33
import org.apache.hadoop.hive.serde.serdeConstants;
33
import org.apache.hadoop.hive.serde.serdeConstants;
34
import org.apache.hadoop.hive.serde2.AbstractSerDe;
34
import org.apache.hadoop.hive.serde2.AbstractSerDe;
35
import org.apache.hadoop.hive.serde2.ByteStream;
35
import org.apache.hadoop.hive.serde2.ByteStream;
36
import org.apache.hadoop.hive.serde2.SerDe;

   
37
import org.apache.hadoop.hive.serde2.SerDeException;
36
import org.apache.hadoop.hive.serde2.SerDeException;
38
import org.apache.hadoop.hive.serde2.SerDeStats;
37
import org.apache.hadoop.hive.serde2.SerDeStats;
39
import org.apache.hadoop.hive.serde2.SerDeUtils;
38
import org.apache.hadoop.hive.serde2.SerDeUtils;
40
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
39
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
41
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
40
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
[+20] [20] 25 lines
[+20] [+] public class HBaseSerDe extends AbstractSerDe {
67
  public static final String HBASE_KEY_COL = ":key";
66
  public static final String HBASE_KEY_COL = ":key";
68
  public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
67
  public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
69
  public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
68
  public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
70
  public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
69
  public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
71
  public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
70
  public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";

    
   
71
  public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching";
72

    
   
72

   
73
  public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
73
  public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
74

    
   
74

   
75
  private ObjectInspector cachedObjectInspector;
75
  private ObjectInspector cachedObjectInspector;
76
  private String hbaseColumnsMapping;
76
  private String hbaseColumnsMapping;

    
   
77
  private boolean doColumnRegexMatching;
77
  private List<ColumnMapping> columnsMapping;
78
  private List<ColumnMapping> columnsMapping;
78
  private SerDeParameters serdeParams;
79
  private SerDeParameters serdeParams;
79
  private boolean useJSONSerialize;
80
  private boolean useJSONSerialize;
80
  private LazyHBaseRow cachedHBaseRow;
81
  private LazyHBaseRow cachedHBaseRow;
81
  private final ByteStream.Output serializeStream = new ByteStream.Output();
82
  private final ByteStream.Output serializeStream = new ByteStream.Output();
[+20] [20] 64 lines
[+20] [+] public void initialize(Configuration conf, Properties tbl)
146
   * @return List<ColumnMapping> which contains the column mapping information by position
147
   * @return List<ColumnMapping> which contains the column mapping information by position
147
   * @throws SerDeException
148
   * @throws SerDeException
148
   */
149
   */
149
  public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
150
  public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
150
      throws SerDeException {
151
      throws SerDeException {

    
   
152
    return parseColumnsMapping(columnsMappingSpec, true);

    
   
153
  }

    
   
154

   

    
   
155
  /**

    
   
156
   * Parses the HBase columns mapping specifier to identify the column families, qualifiers

    
   
157
   * and also caches the byte arrays corresponding to them. One of the Hive table

    
   
158
   * columns maps to the HBase row key, by default the first column.

    
   
159
   *

    
   
160
   * @param columnsMappingSpec string hbase.columns.mapping specified when creating table

    
   
161
   * @param doColumnRegexMatching whether to do a regex matching on the columns or not

    
   
162
   * @return List<ColumnMapping> which contains the column mapping information by position

    
   
163
   * @throws SerDeException

    
   
164
   */

    
   
165
  public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec, boolean doColumnRegexMatching)

    
   
166
      throws SerDeException {
151

    
   
167

   
152
    if (columnsMappingSpec == null) {
168
    if (columnsMappingSpec == null) {
153
      throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
169
      throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
154
    }
170
    }
155

    
   
171

   
[+20] [20] 35 lines
[+20] public void initialize(Configuration conf, Properties tbl)
191
        columnMapping.familyName = parts[0];
207
        columnMapping.familyName = parts[0];
192
        columnMapping.familyNameBytes = Bytes.toBytes(parts[0]);
208
        columnMapping.familyNameBytes = Bytes.toBytes(parts[0]);
193
        columnMapping.hbaseRowKey = false;
209
        columnMapping.hbaseRowKey = false;
194

    
   
210

   
195
        if (parts.length == 2) {
211
        if (parts.length == 2) {

    
   
212

   

    
   
213
          if (doColumnRegexMatching && parts[1].endsWith(".*")) {

    
   
214
            // we have a prefix with a wildcard

    
   
215
            columnMapping.qualifierPrefix = parts[1].substring(0, parts[1].length() - 2);

    
   
216
            columnMapping.qualifierPrefixBytes = Bytes.toBytes(columnMapping.qualifierPrefix);

    
   
217
            // we weren't provided any actual qualifier name. Set these to

    
   
218
            // null.

    
   
219
            columnMapping.qualifierName = null;

    
   
220
            columnMapping.qualifierNameBytes = null;

    
   
221
          } else {

    
   
222
            // set the regular provided qualifier names
196
          columnMapping.qualifierName = parts[1];
223
            columnMapping.qualifierName = parts[1];
197
          columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
224
            columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);

    
   
225
            ;

    
   
226
          }
198
        } else {
227
        } else {
199
          columnMapping.qualifierName = null;
228
          columnMapping.qualifierName = null;
200
          columnMapping.qualifierNameBytes = null;
229
          columnMapping.qualifierNameBytes = null;
201
        }
230
        }
202
      }
231
      }
[+20] [20] 208 lines
[+20] [+] static class ColumnMapping {
411
    byte [] familyNameBytes;
440
    byte [] familyNameBytes;
412
    byte [] qualifierNameBytes;
441
    byte [] qualifierNameBytes;
413
    List<Boolean> binaryStorage;
442
    List<Boolean> binaryStorage;
414
    boolean hbaseRowKey;
443
    boolean hbaseRowKey;
415
    String mappingSpec;
444
    String mappingSpec;

    
   
445
    String qualifierPrefix;

    
   
446
    byte[] qualifierPrefixBytes;
416
  }
447
  }
417

    
   
448

   
418
  private void initHBaseSerDeParameters(
449
  private void initHBaseSerDeParameters(
419
      Configuration job, Properties tbl, String serdeName)
450
      Configuration job, Properties tbl, String serdeName)
420
    throws SerDeException {
451
    throws SerDeException {
421

    
   
452

   
422
    // Read configuration parameters
453
    // Read configuration parameters
423
    hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
454
    hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
424
    String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
455
    String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
425
    putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
456
    putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
426

    
   
457

   

    
   
458
    doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));

    
   
459

   
427
    // Parse and initialize the HBase columns mapping
460
    // Parse and initialize the HBase columns mapping
428
    columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
461
    columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
429

    
   
462

   
430
    // Build the type property string if not supplied
463
    // Build the type property string if not supplied
431
    if (columnTypeProperty == null) {
464
    if (columnTypeProperty == null) {
432
      StringBuilder sb = new StringBuilder();
465
      StringBuilder sb = new StringBuilder();
433

    
   
466

   
[+20] [20] 362 lines
[+20] [+] public void setUseJSONSerialize(boolean useJSONSerialize) {
796

    
   
829

   
797
  List<Boolean> getStorageFormatOfCol(int colPos){
830
  List<Boolean> getStorageFormatOfCol(int colPos){
798
    return columnsMapping.get(colPos).binaryStorage;
831
    return columnsMapping.get(colPos).binaryStorage;
799
  }
832
  }
800

    
   
833

   

    
   
834
  @Override
801
  public SerDeStats getSerDeStats() {
835
  public SerDeStats getSerDeStats() {
802
    // no support for statistics
836
    // no support for statistics
803
    return null;
837
    return null;
804
  }
838
  }
805

    
   
839

   
[+20] [20] 19 lines
hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
Revision b550f45 New Change
 
hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
Revision 01938a7 New Change
 
hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
Revision a8ba9d9 New Change
 
hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
Revision 10a9207 New Change
 
hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
Revision e821282 New Change
 
  1. hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java: Loading...
  2. hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java: Loading...
  3. hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java: Loading...
  4. hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java: Loading...
  5. hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java: Loading...
  6. hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java: Loading...