Review Board 1.7.22


MAPREDUCE-2584. Check for serializers early, and give out more information regarding missing serializers.

Review Request #885 - Created June 11, 2011 and submitted

Harsh J
trunk
MAPREDUCE-2584
Reviewers
hadoop-mapreduce
hadoop-mapreduce-git
As discussed on HADOOP-7328, MapReduce can handle serializers in a much better way in case of bad configuration, improper imports (Some odd Text class instead of the Writable Text set as key), etc..

This issue covers the MapReduce parts of the improvements (made to MapOutputBuffer and possible early-check of serializer availability pre-submit) that provide more information than just an NPE as is the current case.
Added a test case that expects a failure if no io.serializers are present.

Diff revision 4 (Latest)

1 2 3 4
1 2 3 4

  1. src/java/org/apache/hadoop/mapreduce/JobSubmitter.java: Loading...
  2. src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java: Loading...
src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
Revision 751d528 New Change
[20] 30 lines
[+20]
31
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.LogFactory;
32
import org.apache.commons.logging.LogFactory;
33
import org.apache.hadoop.classification.InterfaceAudience;
33
import org.apache.hadoop.classification.InterfaceAudience;
34
import org.apache.hadoop.classification.InterfaceStability;
34
import org.apache.hadoop.classification.InterfaceStability;
35
import org.apache.hadoop.conf.Configuration;
35
import org.apache.hadoop.conf.Configuration;

    
   
36
import org.apache.hadoop.fs.CommonConfigurationKeys;
36
import org.apache.hadoop.fs.FSDataOutputStream;
37
import org.apache.hadoop.fs.FSDataOutputStream;
37
import org.apache.hadoop.fs.FileSystem;
38
import org.apache.hadoop.fs.FileSystem;
38
import org.apache.hadoop.fs.FileUtil;
39
import org.apache.hadoop.fs.FileUtil;
39
import org.apache.hadoop.fs.Path;
40
import org.apache.hadoop.fs.Path;
40
import org.apache.hadoop.fs.permission.FsPermission;
41
import org.apache.hadoop.fs.permission.FsPermission;
41
import org.apache.hadoop.hdfs.DFSClient;
42
import org.apache.hadoop.hdfs.DFSClient;
42
import org.apache.hadoop.io.Text;
43
import org.apache.hadoop.io.Text;

    
   
44
import org.apache.hadoop.io.serializer.Deserializer;

    
   
45
import org.apache.hadoop.io.serializer.SerializationFactory;

    
   
46
import org.apache.hadoop.io.serializer.Serializer;
43
import org.apache.hadoop.mapred.JobConf;
47
import org.apache.hadoop.mapred.JobConf;
44
import org.apache.hadoop.mapred.QueueACL;
48
import org.apache.hadoop.mapred.QueueACL;
45
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
49
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
46

    
   
50

   
47
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
51
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
[+20] [20] 340 lines
[+20] [+] private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
388
          jtFs.delete(submitJobDir, true);
392
          jtFs.delete(submitJobDir, true);
389

    
   
393

   
390
      }
394
      }
391
    }
395
    }
392
  }
396
  }
393
  
397

   

    
   
398
  /**

    
   
399
   * Method that checks for typical Job specifications such as output

    
   
400
   * directories and serializers.

    
   
401
   * @param job The job to check specifications for.

    
   
402
   */
394
  private void checkSpecs(Job job) throws ClassNotFoundException, 
403
  private void checkSpecs(Job job) throws ClassNotFoundException, 
395
      InterruptedException, IOException {
404
      InterruptedException, IOException {
396
    JobConf jConf = (JobConf)job.getConfiguration();

   
397
    // Check the output specification
405
    // Check the output specification

    
   
406
    checkOutputSpecs(job);

    
   
407
    // Check serializer specification for map-outputs

    
   
408
    checkSerializerSpecs(job);

    
   
409
  }

    
   
410

   

    
   
411
  /**

    
   
412
   * Check a Job's output specs using its OutputFormat.

    
   
413
   * @param job The job to check output specs for.

    
   
414
   */

    
   
415
  private void checkOutputSpecs(Job job) throws IOException,

    
   
416
      InterruptedException, ClassNotFoundException {

    
   
417
    JobConf jConf = (JobConf)job.getConfiguration();
398
    if (jConf.getNumReduceTasks() == 0 ? 
418
    if (jConf.getNumReduceTasks() == 0 ? 
399
        jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
419
        jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
400
      org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
420
      org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
401
        ReflectionUtils.newInstance(job.getOutputFormatClass(),
421
        ReflectionUtils.newInstance(job.getOutputFormatClass(),
402
          job.getConfiguration());
422
          job.getConfiguration());
403
      output.checkOutputSpecs(job);
423
      output.checkOutputSpecs(job);
404
    } else {
424
    } else {
405
      jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
425
      jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
406
    }
426
    }
407
  }
427
  }
408
  
428

   

    
   
429
  /**

    
   
430
   * Checks if the Map-output K/V classes have serialization available.

    
   
431
   * @param job Job to check SerDe for.

    
   
432
   */

    
   
433
  @SuppressWarnings("rawtypes")

    
   
434
  private void checkSerializerSpecs(Job job) throws IOException {

    
   
435
    // Only check if there are reducers (i.e., Map Outputs will be made).

    
   
436
    if (job.getNumReduceTasks() > 0) {

    
   
437
      String msg = "Couldn't find a serialization class for the " +

    
   
438
                   "Map-Output %s class.";

    
   
439
      SerializationFactory serializationFactory =

    
   
440
        new SerializationFactory(job.getConfiguration());

    
   
441
      try {

    
   
442
        serializationFactory.getSerialization(job.getMapOutputKeyClass());

    
   
443
      } catch (IOException e) {

    
   
444
        throw new IOException(String.format(msg, "Key"), e);

    
   
445
      }

    
   
446
      try {

    
   
447
        serializationFactory.getSerialization(job.getMapOutputValueClass());

    
   
448
      } catch (IOException e) {

    
   
449
        throw new IOException(String.format(msg, "Value"), e);

    
   
450
      }

    
   
451
    }

    
   
452
  }

    
   
453

   
409
  private void writeConf(Configuration conf, Path jobFile) 
454
  private void writeConf(Configuration conf, Path jobFile) 
410
      throws IOException {
455
      throws IOException {
411
    // Write job file to JobTracker's fs        
456
    // Write job file to JobTracker's fs        
412
    FSDataOutputStream out = 
457
    FSDataOutputStream out = 
413
      FileSystem.create(jtFs, jobFile, 
458
      FileSystem.create(jtFs, jobFile, 
[+20] [20] 163 lines
src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java
Revision 5fa329a New Change
 
  1. src/java/org/apache/hadoop/mapreduce/JobSubmitter.java: Loading...
  2. src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java: Loading...