Review Board 1.7.22


Hcatalog input format changes with integration of output format changes and record reader changes

Review Request #3901 - Created Feb. 14, 2012 and updated

Vikram Dixit Kumaraswamy
HCATALOG-239
Reviewers
francisliu, gates, sushanth
hcatalog
This is a patch with input format specific changes integrated with record reader and output format changes.

 
trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
Revision 1241662 New Change
[20] 28 lines
[+20]
29
import java.util.Iterator;
29
import java.util.Iterator;
30
import java.util.LinkedList;
30
import java.util.LinkedList;
31
import java.util.List;
31
import java.util.List;
32
import java.util.Map;
32
import java.util.Map;
33
import java.util.Map.Entry;
33
import java.util.Map.Entry;

    
   
34
import java.util.Properties;
34
import java.util.Set;
35
import java.util.Set;
35

    
   
36

   
36
import org.apache.commons.logging.Log;
37
import org.apache.commons.logging.Log;

    
   
38
import org.apache.commons.logging.LogFactory;
37
import org.apache.hadoop.conf.Configuration;
39
import org.apache.hadoop.conf.Configuration;
38
import org.apache.hadoop.fs.permission.FsAction;
40
import org.apache.hadoop.fs.permission.FsAction;
39
import org.apache.hadoop.hive.common.JavaUtils;
41
import org.apache.hadoop.hive.common.JavaUtils;
40
import org.apache.hadoop.hive.conf.HiveConf;
42
import org.apache.hadoop.hive.conf.HiveConf;

    
   
43
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
41
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
44
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
42
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
45
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
43
import org.apache.hadoop.hive.metastore.api.FieldSchema;
46
import org.apache.hadoop.hive.metastore.api.FieldSchema;
44
import org.apache.hadoop.hive.metastore.api.MetaException;
47
import org.apache.hadoop.hive.metastore.api.MetaException;
45
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
48
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
46
import org.apache.hadoop.hive.metastore.api.Table;
49
import org.apache.hadoop.hive.metastore.api.Table;
47
import org.apache.hadoop.hive.ql.metadata.HiveException;
50
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
48
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
51
import org.apache.hadoop.hive.ql.plan.TableDesc;

    
   
52
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

    
   
53
import org.apache.hadoop.hive.serde2.SerDe;
49
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
54
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
50
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
55
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
51
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
56
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
52
import org.apache.hadoop.io.Text;
57
import org.apache.hadoop.io.Text;
53
import org.apache.hadoop.mapred.JobClient;
58
import org.apache.hadoop.mapred.JobClient;
54
import org.apache.hadoop.mapred.JobConf;
59
import org.apache.hadoop.mapred.JobConf;
55
import org.apache.hadoop.mapreduce.JobContext;
60
import org.apache.hadoop.mapreduce.JobContext;
56
import org.apache.hadoop.security.token.Token;
61
import org.apache.hadoop.security.token.Token;
57
import org.apache.hadoop.security.token.TokenIdentifier;
62
import org.apache.hadoop.security.token.TokenIdentifier;
58
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
63
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
59
import org.apache.hadoop.util.ReflectionUtils;
64
import org.apache.hadoop.util.ReflectionUtils;

    
   
65
import org.apache.hcatalog.data.DataType;

    
   
66
import org.apache.hcatalog.data.HCatRecord;

    
   
67
import org.apache.hcatalog.data.HCatRecordSerDe;
60
import org.apache.hcatalog.data.Pair;
68
import org.apache.hcatalog.data.Pair;
61
import org.apache.hcatalog.data.schema.HCatFieldSchema;
69
import org.apache.hcatalog.data.schema.HCatFieldSchema;
62
import org.apache.hcatalog.data.schema.HCatSchema;
70
import org.apache.hcatalog.data.schema.HCatSchema;
63
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
71
import org.apache.hcatalog.data.schema.HCatSchemaUtils;

    
   
72
import org.apache.hcatalog.mapreduce.FosterStorageHandler;
64
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
73
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
65
import org.apache.hcatalog.storagehandler.HCatStorageHandler;
74
import org.apache.hcatalog.mapreduce.HCatStorageHandler;

    
   
75
import org.apache.hcatalog.mapreduce.OutputJobInfo;

    
   
76
import org.apache.hcatalog.mapreduce.StorerInfo;
66
import org.apache.thrift.TException;
77
import org.apache.thrift.TException;
67

    
   
78

   
68
public class HCatUtil {
79
public class HCatUtil {
69

    
   
80

   
70
    // static final private Log LOG = LogFactory.getLog(HCatUtil.class);
81
//     static final private Log LOG = LogFactory.getLog(HCatUtil.class);
71

    
   
82

   
72
    public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
83
    public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
73
        if (j.getConfiguration().get("mapred.task.id", "").equals("")) {
84
        if (j.getConfiguration().get("mapred.task.id", "").equals("")) {
74
            return false;
85
            return false;
75
        }
86
        }
[+20] [20] 313 lines
[+20] [+] public static void cancelJobTrackerDelegationToken(String tokenStrForm,
389
     * @param logger
400
     * @param logger
390
     */
401
     */
391
    public static void logStackTrace(Log logger) {
402
    public static void logStackTrace(Log logger) {
392
        StackTraceElement[] stackTrace = new Exception().getStackTrace();
403
        StackTraceElement[] stackTrace = new Exception().getStackTrace();
393
        for (int i = 1; i < stackTrace.length; i++) {
404
        for (int i = 1; i < stackTrace.length; i++) {
394
            logger.info("\t" + stackTrace[i].toString());
405
            logger.debug("\t" + stackTrace[i].toString());
395
        }
406
        }
396
    }
407
    }
397

    
   
408

   
398
    /**
409
    /**
399
     * debug log the hive conf
410
     * debug log the hive conf
[+20] [20] 6 lines
[+20] [+] public static void logHiveConf(Log logger, HiveConf hc) {
406
                .entrySet());
417
                .entrySet());
407
    }
418
    }
408

    
   
419

   
409
    public static void logList(Log logger, String itemName,
420
    public static void logList(Log logger, String itemName,
410
            List<? extends Object> list) {
421
            List<? extends Object> list) {
411
        logger.info(itemName + ":");
422
        logger.debug(itemName + ":");
412
        for (Object item : list) {
423
        for (Object item : list) {
413
            logger.info("\t[" + item + "]");
424
            logger.debug("\t[" + item + "]");
414
        }
425
        }
415
    }
426
    }
416

    
   
427

   
417
    public static void logMap(Log logger, String itemName,
428
    public static void logMap(Log logger, String itemName,
418
            Map<? extends Object, ? extends Object> map) {
429
            Map<? extends Object, ? extends Object> map) {
[+20] [20] 28 lines
[+20] [+] public static void logToken(Log logger, String itemName,
447
        logger.info("\ttoString : " + t.toString());
458
        logger.info("\ttoString : " + t.toString());
448
        logger.info("\tkind : " + t.getKind());
459
        logger.info("\tkind : " + t.getKind());
449
        logger.info("\tservice : " + t.getService());
460
        logger.info("\tservice : " + t.getService());
450
    }
461
    }
451

    
   
462

   

    
   
463
    /**

    
   
464
     * Create an instance of a storage handler defined in storerInfo. If one cannot be found

    
   
465
     * then FosterStorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.

    
   
466
     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.

    
   
467
     * @param conf job's configuration will be used to configure the Configurable StorageHandler

    
   
468
     * @param storerInfo StorerInfo to definining the StorageHandler and InputFormat, OutputFormat and SerDe

    
   
469
     * @return storageHandler instance

    
   
470
     * @throws IOException

    
   
471
     */

    
   
472
    public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException {

    
   
473
        return getStorageHandler(conf,

    
   
474
                                              storerInfo.getStorageHandlerClass(),

    
   
475
                                              storerInfo.getSerdeClass(),

    
   
476
                                              storerInfo.getIfClass(),

    
   
477
                                              storerInfo.getOfClass());

    
   
478
    }

    
   
479

   

    
   
480
    /**

    
   
481
     * Create an instance of a storage handler. If storageHandler == null,

    
   
482
     * then surrrogate StorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.

    
   
483
     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.

    
   
484
     * @param conf job's configuration will be used to configure the Configurable StorageHandler

    
   
485
     * @param storageHandler fully qualified class name of the desired StorageHandle instance

    
   
486
     * @param serDe fully qualified class name of the desired SerDe instance

    
   
487
     * @param inputFormat fully qualified class name of the desired InputFormat instance

    
   
488
     * @param outputFormat fully qualified class name of the desired outputFormat instance

    
   
489
     * @return storageHandler instance

    
   
490
     * @throws IOException

    
   
491
     */
452
    public static HCatStorageHandler getStorageHandler(Configuration conf,
492
    public static HCatStorageHandler getStorageHandler(Configuration conf,
453
            String className) throws HiveException {
493
                                                                                  String storageHandler,

    
   
494
                                                                                  String serDe,

    
   
495
                                                                                  String inputFormat,

    
   
496
                                                                                  String outputFormat) throws IOException {
454

    
   
497

   
455
        if (className == null) {
498

   
456
            return null;
499
        if (storageHandler == null) {

    
   
500
            try {

    
   
501
                return new FosterStorageHandler(inputFormat,

    
   
502
                                                                  outputFormat,

    
   
503
                                                                  serDe);

    
   
504
            } catch (ClassNotFoundException e) {

    
   
505
                throw new IOException("Failed to load foster storage handler",e);

    
   
506
            }
457
        }
507
        }

    
   
508

   
458
        try {
509
        try {
459
            Class<? extends HCatStorageHandler> handlerClass = (Class<? extends HCatStorageHandler>) Class
510
            Class<? extends HCatStorageHandler> handlerClass = (Class<? extends HCatStorageHandler>) Class
460
                    .forName(className, true, JavaUtils.getClassLoader());
511
                    .forName(storageHandler, true, JavaUtils.getClassLoader());
461
            HCatStorageHandler storageHandler = (HCatStorageHandler) ReflectionUtils
512
            return (HCatStorageHandler)ReflectionUtils.newInstance(handlerClass, conf);
462
                    .newInstance(handlerClass, conf);

   
463
            return storageHandler;

   
464
        } catch (ClassNotFoundException e) {
513
        } catch (ClassNotFoundException e) {
465
            throw new HiveException("Error in loading storage handler."
514
            throw new IOException("Error in loading storage handler."
466
                    + e.getMessage(), e);
515
                    + e.getMessage(), e);
467
        }
516
        }
468
    }
517
    }
469

    
   
518

   
470
    public static Pair<String,String> getDbAndTableName(String tableName) throws IOException{
519
    public static Pair<String,String> getDbAndTableName(String tableName) throws IOException{
[+20] [20] 5 lines
[+20] public static HCatStorageHandler getStorageHandler(Configuration conf,
476
      }else{
525
      }else{
477
        throw new IOException("tableName expected in the form "
526
        throw new IOException("tableName expected in the form "
478
            +"<databasename>.<table name> or <table name>. Got " + tableName);
527
            +"<databasename>.<table name> or <table name>. Got " + tableName);
479
      }
528
      }
480
    }
529
    }

    
   
530

   

    
   
531
    public static void configureOutputStorageHandler(HCatStorageHandler storageHandler,

    
   
532
                                                                              JobContext context,

    
   
533
                                                                              OutputJobInfo outputJobInfo) {

    
   
534
        //TODO replace IgnoreKeyTextOutputFormat with a HiveOutputFormatWrapper in StorageHandler

    
   
535
        TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),

    
   
536
                                                                   storageHandler.getInputFormatClass(),

    
   
537
                                                                   IgnoreKeyTextOutputFormat.class,

    
   
538
                                                                   outputJobInfo.getTableInfo().getStorerInfo().getProperties());

    
   
539
        if(tableDesc.getJobProperties() == null)

    
   
540
            tableDesc.setJobProperties(new HashMap<String, String>());

    
   
541
        for (Map.Entry<String,String> el: context.getConfiguration()) {

    
   
542
           tableDesc.getJobProperties().put(el.getKey(),el.getValue());

    
   
543
        }

    
   
544

   

    
   
545
        Map<String,String> jobProperties = new HashMap<String,String>();

    
   
546
        try {

    
   
547
            tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));

    
   
548

   

    
   
549
            storageHandler.configureOutputJobProperties(tableDesc,jobProperties);

    
   
550

   

    
   
551
            for(Map.Entry<String,String> el: jobProperties.entrySet()) {

    
   
552
                context.getConfiguration().set(el.getKey(),el.getValue());

    
   
553
            }

    
   
554
        } catch (IOException e) {

    
   
555
            throw new IllegalStateException("Failed to configure StorageHandler",e);

    
   
556
        }

    
   
557
    }

    
   
558

   

    
   
559
    /**

    
   
560
     * Replace the contents of dest with the contents of src

    
   
561
     * @param src

    
   
562
     * @param dest

    
   
563
     */

    
   
564
    public static void copyConf(Configuration src, Configuration dest) {

    
   
565
        dest.clear();

    
   
566
        for(Map.Entry<String,String> el : src) {

    
   
567
            dest.set(el.getKey(),el.getValue());

    
   
568
        }

    
   
569
    }

    
   
570

   

    
   
571
    /**

    
   
572
     * Get the input reader for this input format. This is responsible for 

    
   
573
     * ensuring the input is correctly read.

    
   
574
     * @param context of the task

    
   
575
     * @return an input split

    
   
576
     * @throws IOException

    
   
577
     * @throws InterruptedException

    
   
578
     */

    
   
579
    public static HiveMetaStoreClient createHiveClient(String url, 

    
   
580
                                    Configuration conf, Class<?> cls) 

    
   
581
      throws IOException, MetaException {

    
   
582
        HiveConf hiveConf = getHiveConf(url, conf, cls);

    
   
583
        try {

    
   
584
          return new HiveMetaStoreClient(hiveConf);

    
   
585
        } catch (MetaException e) {

    
   
586
          // LOG.error("Error connecting to the metastore (conf follows): " 

    
   
587
          //           + e.getMessage(), e);

    
   
588
          // HCatUtil.logHiveConf(LOG, hiveConf);

    
   
589
          throw e;

    
   
590
        }

    
   
591
    }

    
   
592

   

    
   
593
    public static HiveConf getHiveConf (String url, 

    
   
594
                                        Configuration conf, Class<?> cls) 

    
   
595
      throws IOException {

    
   
596
      HiveConf hiveConf = new HiveConf(cls);

    
   
597
      if (url != null) {

    
   
598
        // User specified a thrift url

    
   
599
        hiveConf.set("hive.metastore.local", "false");

    
   
600
        hiveConf.set(ConfVars.METASTOREURIS.varname, url);

    
   
601

   

    
   
602
        String kerberosPrincipal = conf.get(

    
   
603
            HCatConstants.HCAT_METASTORE_PRINCIPAL);

    
   
604
        if (kerberosPrincipal == null) {

    
   
605
          kerberosPrincipal = conf.get(

    
   
606
              ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);

    
   
607
        }

    
   
608
        if (kerberosPrincipal != null) {

    
   
609
          hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname,

    
   
610
              true);

    
   
611
          hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, 

    
   
612
              kerberosPrincipal);

    
   
613
        }

    
   
614
      } else {

    
   
615
        // Thrift url is null, 

    
   
616
        // copy the hive conf into the job conf and restore it

    
   
617
        // in the backend

    
   
618
        if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) {

    
   
619
          conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, 

    
   
620
              HCatUtil.serialize(hiveConf.getAllProperties()));

    
   
621
        } else {

    
   
622
          // Copy configuration properties into the hive conf

    
   
623
          Properties properties = (Properties) HCatUtil.deserialize(

    
   
624
              conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));

    
   
625
          for (Map.Entry<Object, Object> prop: properties.entrySet()) {

    
   
626
            if (prop.getValue() instanceof String) {

    
   
627
              hiveConf.set((String)prop.getKey(), (String)prop.getValue());

    
   
628
            } else if (prop.getValue() instanceof Integer) {

    
   
629
              hiveConf.setInt((String)prop.getKey(), (Integer)prop.getValue());

    
   
630
            } else if (prop.getValue() instanceof Boolean) {

    
   
631
              hiveConf.setBoolean((String)prop.getKey(), 

    
   
632
                  (Boolean)prop.getValue());

    
   
633
            } else if (prop.getValue() instanceof Long) {

    
   
634
              hiveConf.setLong((String)prop.getKey(),

    
   
635
                  (Long)prop.getValue());

    
   
636
            } else if (prop.getValue() instanceof Float) {

    
   
637
              hiveConf.setFloat((String)prop.getKey(), (Float)prop.getValue());

    
   
638
            }

    
   
639
          }

    
   
640
        }

    
   
641
      }

    
   
642

   

    
   
643
      if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {

    
   
644
        hiveConf.set("hive.metastore.token.signature", 

    
   
645
            conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));

    
   
646
      }

    
   
647

   

    
   
648
      return hiveConf;

    
   
649
    }

    
   
650

   

    
   
651
    public static int compareRecords(HCatRecord first, HCatRecord second) {

    
   
652
      try {

    
   
653
        return compareRecordContents(first.getAll(), second.getAll());

    
   
654

   

    
   
655
      } catch (HCatException e) {

    
   
656
        // uh-oh - we've hit a deserialization error in all likelihood

    
   
657
        // we're likely not going to recover from this. Alright, the best 

    
   
658
        // we can do is throw a ClassCastException (the only thing throwable

    
   
659
        // from a compareTo, and in a sense, inability to read the object

    
   
660
        // is what got us here.

    
   
661
        throw new ClassCastException(e.getMessage());

    
   
662
      }

    
   
663
    }

    
   
664

   

    
   
665
    public static int compareRecordContents(List<Object> first, List<Object> second) {

    
   
666
      int mySz = first.size();

    
   
667
      int urSz = second.size();

    
   
668
      if(mySz != urSz) {

    
   
669
        return mySz - urSz;

    
   
670
      } else {

    
   
671
        for (int i = 0; i < first.size(); i++) {

    
   
672
          int c = DataType.compare(first.get(i), second.get(i));

    
   
673
          if (c != 0) {

    
   
674
            return c;

    
   
675
          }

    
   
676
        }

    
   
677
        return 0;

    
   
678
      }

    
   
679
    }

    
   
680

   

    
   
681
    public static ObjectInspector getObjectInspector(String serdeClassName, 

    
   
682
        Configuration conf, Properties tbl) throws Exception {

    
   
683
      SerDe s = (SerDe) Class.forName(serdeClassName).newInstance();

    
   
684
      s.initialize(conf, tbl);

    
   
685
      return s.getObjectInspector();

    
   
686
    }

    
   
687

   

    
   
688
    public static ObjectInspector getHCatRecordObjectInspector(HCatSchema hsch) throws Exception{

    
   
689
      HCatRecordSerDe hrsd = new HCatRecordSerDe();

    
   
690
      hrsd.initialize(hsch);

    
   
691
      return hrsd.getObjectInspector();

    
   
692
    }
481
}
693
}
trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
Revision 1241662 New Change
 
trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java
Revision 1241662 New Change
 
trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
Revision 1241662 New Change
 
trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Revision 1241662 New Change
 
trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
Revision 1241662 New Change
 
trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
Revision 1241662 New Change
 
  1. trunk/src/java/org/apache/hcatalog/common/HCatUtil.java: Loading...
  2. trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java: Loading...
  3. trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java: Loading...
  4. trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java: Loading...
  5. trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java: Loading...
  6. trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java: Loading...
  7. trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java: Loading...