Review Board 1.7.22


ProgressReporter should work with both old and new MR API

Review Request #4971 - Created May 2, 2012 and updated

Travis Crawford
HCATALOG-373
Reviewers
hcatalog
francisliu
hcatalog-git
Update ProgressReporter to work with both old and new mapreduce API. Delay creating the base record reader so we have a StatusReporter and can use counters.
"ant clean test" passes

I can run pig+hcatalog queries using Elephant-Bird deprecated API wrappers, which is why this issue originally came up.
src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
Revision 268167e New Change
[20] 22 lines
[+20]
23
import java.util.LinkedList;
23
import java.util.LinkedList;
24
import java.util.Map;
24
import java.util.Map;
25
import java.util.HashMap;
25
import java.util.HashMap;
26
import java.util.List;
26
import java.util.List;
27

    
   
27

   
28
import org.apache.hadoop.hive.serde2.SerDe;

   
29

    
   

   
30
import org.apache.hadoop.conf.Configuration;
28
import org.apache.hadoop.conf.Configuration;
31
import org.apache.hadoop.fs.FileSystem;
29
import org.apache.hadoop.fs.FileSystem;
32
import org.apache.hadoop.fs.Path;
30
import org.apache.hadoop.fs.Path;
33
import org.apache.hadoop.io.WritableComparable;
31
import org.apache.hadoop.io.WritableComparable;
34
import org.apache.hadoop.io.Writable;
32
import org.apache.hadoop.io.Writable;
35
import org.apache.hadoop.mapred.JobConf;
33
import org.apache.hadoop.mapred.JobConf;
36
import org.apache.hadoop.mapred.Reporter;

   
37
import org.apache.hadoop.mapreduce.InputFormat;
34
import org.apache.hadoop.mapreduce.InputFormat;
38
import org.apache.hadoop.mapreduce.InputSplit;
35
import org.apache.hadoop.mapreduce.InputSplit;
39
import org.apache.hadoop.mapreduce.Job;
36
import org.apache.hadoop.mapreduce.Job;
40
import org.apache.hadoop.mapreduce.JobContext;
37
import org.apache.hadoop.mapreduce.JobContext;
41
import org.apache.hadoop.mapreduce.RecordReader;
38
import org.apache.hadoop.mapreduce.RecordReader;
[+20] [20] 39 lines
[+20] [+] public static HCatSchema getOutputSchema(JobContext context)
81
    throws IOException {
78
    throws IOException {
82
    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, 
79
    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, 
83
                               HCatUtil.serialize(hcatSchema));
80
                               HCatUtil.serialize(hcatSchema));
84
  }
81
  }
85

    
   
82

   
86
  private static 
83
  protected static
87
    org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
84
    org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
88
    getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException {
85
    getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException {
89
      return (
86
      return (
90
          org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>) 
87
          org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>) 
91
        ReflectionUtils.newInstance(inputFormatClass, job);
88
        ReflectionUtils.newInstance(inputFormatClass, job);
[+20] [20] 84 lines
[+20] public static HCatSchema getOutputSchema(JobContext context)
176
  @Override
173
  @Override
177
  public RecordReader<WritableComparable, HCatRecord> 
174
  public RecordReader<WritableComparable, HCatRecord> 
178
  createRecordReader(InputSplit split,
175
  createRecordReader(InputSplit split,
179
      TaskAttemptContext taskContext) throws IOException, InterruptedException {
176
      TaskAttemptContext taskContext) throws IOException, InterruptedException {
180

    
   
177

   
181
    HCatSplit hcatSplit = (HCatSplit) split;
178
    HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
182
    PartInfo partitionInfo = hcatSplit.getPartitionInfo();
179
    PartInfo partitionInfo = hcatSplit.getPartitionInfo();
183
    JobContext jobContext = taskContext;
180
    JobContext jobContext = taskContext;
184

    
   
181

   
185
    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
182
    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
186
        jobContext.getConfiguration(), partitionInfo);
183
        jobContext.getConfiguration(), partitionInfo);
187
    
184
    
188
    JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
185
    JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
189

    
   

   
190
    Class inputFormatClass = storageHandler.getInputFormatClass();

   
191
    org.apache.hadoop.mapred.InputFormat inputFormat = 

   
192
                              getMapRedInputFormat(jobConf, inputFormatClass);

   
193

    
   

   
194
    Map<String, String> jobProperties = partitionInfo.getJobProperties();
186
    Map<String, String> jobProperties = partitionInfo.getJobProperties();
195
    HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
187
    HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
196
    Reporter reporter = InternalUtil.createReporter(taskContext);

   
197
    org.apache.hadoop.mapred.RecordReader recordReader =

   
198
      inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, reporter);

   
199

    
   

   
200
    SerDe serde;

   
201
    try {

   
202
      serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), 

   
203
                                          jobContext.getConfiguration());

   
204

    
   

   
205
//    HCatUtil.logEntrySet(LOG, "props to serde", properties.entrySet());

   
206

    
   

   
207
      Configuration conf = storageHandler.getConf();

   
208
      InternalUtil.initializeInputSerDe(serde, conf, 

   
209
                                  partitionInfo.getTableInfo(),partitionInfo.getPartitionSchema());

   
210
                                  

   
211
    } catch (Exception e) {

   
212
      throw new IOException("Unable to create objectInspector "

   
213
          + "for serde class " + storageHandler.getSerDeClass().getName()

   
214
          + e);

   
215
    }

   
216

    
   
188

   
217
    Map<String,String> valuesNotInDataCols = getColValsNotInDataColumns(
189
    Map<String,String> valuesNotInDataCols = getColValsNotInDataColumns(
218
        getOutputSchema(jobContext),partitionInfo
190
        getOutputSchema(jobContext),partitionInfo
219
        );
191
        );
220

    
   
192

   
221
    HCatRecordReader hcatRecordReader = new HCatRecordReader(storageHandler, 
193
    return new HCatRecordReader(storageHandler, valuesNotInDataCols);
222
                                                             recordReader, 

   
223
                                                             serde,

   
224
                                                             valuesNotInDataCols);

   
225
    return hcatRecordReader;

   
226
  }
194
  }
227

    
   
195

   
228
  
196

   
229
  /**
197
  /**
230
   * gets values for fields requested by output schema which will not be in the data
198
   * gets values for fields requested by output schema which will not be in the data
231
   */
199
   */
232
  private static Map<String,String> getColValsNotInDataColumns(HCatSchema outputSchema,
200
  private static Map<String,String> getColValsNotInDataColumns(HCatSchema outputSchema,
233
      PartInfo partInfo){
201
      PartInfo partInfo){
[+20] [20] 116 lines
src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Revision 65f96f4 New Change
 
src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
Revision 1837081 New Change
 
src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
Revision fb379cd New Change
 
src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
Revision f3d07a0 New Change
 
  1. src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java: Loading...
  2. src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java: Loading...
  3. src/java/org/apache/hcatalog/mapreduce/InternalUtil.java: Loading...
  4. src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java: Loading...
  5. src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java: Loading...