Review Board 1.7.22


MAPREDUCE-2584. Check for serializers early, and give out more information regarding missing serializers.

Review Request #885 - Created June 11, 2011 and submitted

Harsh J
trunk
MAPREDUCE-2584
Reviewers
hadoop-mapreduce
hadoop-mapreduce-git
As discussed on HADOOP-7328, MapReduce can handle serializers in a much better way in case of bad configuration, improper imports (Some odd Text class instead of the Writable Text set as key), etc..

This issue covers the MapReduce parts of the improvements (made to MapOutputBuffer and possible early-check of serializer availability pre-submit) that provide more information than just an NPE as is the current case.
Added a test case that expects a failure if no io.serializers are present.

Changes between revision 1 and 3

1 2 3 4
1 2 3 4

  1. src/java/org/apache/hadoop/mapreduce/JobSubmitter.java: Loading...
  2. src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java: Loading...
src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
Diff Revision 1 Diff Revision 3
[20] 38 lines
[+20]
39
import org.apache.hadoop.fs.FileUtil;
39
import org.apache.hadoop.fs.FileUtil;
40
import org.apache.hadoop.fs.Path;
40
import org.apache.hadoop.fs.Path;
41
import org.apache.hadoop.fs.permission.FsPermission;
41
import org.apache.hadoop.fs.permission.FsPermission;
42
import org.apache.hadoop.hdfs.DFSClient;
42
import org.apache.hadoop.hdfs.DFSClient;
43
import org.apache.hadoop.io.Text;
43
import org.apache.hadoop.io.Text;

    
   
44
import org.apache.hadoop.io.serializer.Deserializer;
44
import org.apache.hadoop.io.serializer.SerializationFactory;
45
import org.apache.hadoop.io.serializer.SerializationFactory;
45
import org.apache.hadoop.io.serializer.Serializer;
46
import org.apache.hadoop.io.serializer.Serializer;
46
import org.apache.hadoop.mapred.JobConf;
47
import org.apache.hadoop.mapred.JobConf;
47
import org.apache.hadoop.mapred.QueueACL;
48
import org.apache.hadoop.mapred.QueueACL;
48
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
49
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
[+20] [20] 346 lines
[+20] [+] private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
395
  }
396
  }
396

    
   
397

   
397
  /**
398
  /**
398
   * Method that checks for typical Job specifications such as output
399
   * Method that checks for typical Job specifications such as output
399
   * directories and serializers.
400
   * directories and serializers.
400
   * @param job - The job to check for.
401
   * @param job The job to check specifications for.
401
   * @throws ClassNotFoundException

   
402
   * @throws InterruptedException

   
403
   * @throws IOException

   
404
   */
402
   */
405
  private void checkSpecs(Job job) throws ClassNotFoundException, 
403
  private void checkSpecs(Job job) throws ClassNotFoundException, 
406
      InterruptedException, IOException {
404
      InterruptedException, IOException {
407
    // Check the output specification
405
    // Check the output specification
408
    checkOutputSpecs(job);
406
    checkOutputSpecs(job);
409
    // Check serializer specification for map-outputs
407
    // Check serializer specification for map-outputs
410
    checkSerializerSpecs(job);
408
    checkSerializerSpecs(job);
411
  }
409
  }
412

    
   
410

   
413
  /**
411
  /**
414
   * Checks the output specs for a given Job
412
   * Check a Job's output specs using its OutputFormat.
415
   * (Uses Job's provided OutputFormat to check)
413
   * @param job The job to check output specs for.
416
   * @param job - The job to check for.

   
417
   * @throws IOException

   
418
   * @throws InterruptedException

   
419
   * @throws ClassNotFoundException

   
420
   */
414
   */
421
  private void checkOutputSpecs(Job job) throws IOException,
415
  private void checkOutputSpecs(Job job) throws IOException,
422
      InterruptedException, ClassNotFoundException {
416
      InterruptedException, ClassNotFoundException {
423
    JobConf jConf = (JobConf)job.getConfiguration();
417
    JobConf jConf = (JobConf)job.getConfiguration();
424
    if (jConf.getNumReduceTasks() == 0 ? 
418
    if (jConf.getNumReduceTasks() == 0 ? 
[+20] [20] 6 lines
[+20] private void checkOutputSpecs(Job job) throws IOException,
431
      jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
425
      jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
432
    }
426
    }
433
  }
427
  }
434

    
   
428

   
435
  /**
429
  /**
436
   * Checks if all Key/Value classes have serializer and deserializers
430
   * Checks if the Map-output K/V classes have serializers available.
437
   * associated with them.
431
   * @param job Job to check SerDe for.
438
   * (Note: Only checks Map-output K/V classes, not general output ones.)

   
439
   * @param job - The Job to check SerDe for.

   
440
   * @throws IOException

   
441
   */
432
   */
442
  @SuppressWarnings("rawtypes")
433
  @SuppressWarnings("rawtypes")
443
  private void checkSerializerSpecs(Job job) throws IOException {
434
  private void checkSerializerSpecs(Job job) throws IOException {
444
    // Only check if there are reducers (i.e., Map Outputs will be made).
435
    // Only check if there are reducers (i.e., Map Outputs will be made).
445
    if (job.getNumReduceTasks() > 0) {
436
    if (job.getNumReduceTasks() > 0) {

    
   
437
      String msg = "Couldn't find a %s for the Map-Output %s class: '%s'. " +

    
   
438
                   "If custom serialization is being used, ensure that the " +

    
   
439
                   "'" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' " +

    
   
440
                   "property is appropriately configured for the Job.";
446
      SerializationFactory serializationFactory = new SerializationFactory(
441
      SerializationFactory serializationFactory = new SerializationFactory(
447
          job.getConfiguration());
442
        job.getConfiguration());
448
      try {

   
449
        // Check Key-class
443
      // Check Key-class
450
        Serializer outputMapKeySerializer =
444
      Serializer outputMapKeySerializer =
451
          serializationFactory.getSerializer(job.getMapOutputKeyClass());
445
        serializationFactory.getSerializer(job.getMapOutputKeyClass());
452
        if (outputMapKeySerializer == null) {
446
      if (outputMapKeySerializer == null) {
453
          throw new IOException("Couldn't find a serializer for the "+
447
        throw new IOException(String.format(msg, "serializer", "Key"));
454
              "Map-Output Key class: '" +
448
      }
455
              job.getMapOutputKeyClass().getCanonicalName() + "'");
449
      Deserializer outputMapKeyDeserializer =

    
   
450
        serializationFactory.getDeserializer(job.getMapOutputKeyClass());

    
   
451
      if (outputMapKeyDeserializer == null) {

    
   
452
        throw new IOException(String.format(msg, "deserializer", "Key"));
456
        }
453
      }
457
        // Check Value-class
454
      // Check Value-class
458
        Serializer outputMapValueSerializer =
455
      Serializer outputMapValueSerializer =
459
          serializationFactory.getSerializer(job.getMapOutputValueClass());
456
        serializationFactory.getSerializer(job.getMapOutputValueClass());
460
        if (outputMapValueSerializer == null) {
457
      if (outputMapValueSerializer == null) {
461
          throw new IOException("Couldn't find a serializer for the "+
458
        throw new IOException(String.format(msg, "serializer", "Value"));
462
              "Map-Output Value class: '" +
459
      }
463
              job.getMapOutputValueClass().getCanonicalName() + "'");
460
      Deserializer outputMapValueDeserializer =
464
        }
461
        serializationFactory.getDeserializer(job.getMapOutputValueClass());
465
      } catch (IOException e) {
462
      if (outputMapValueDeserializer == null) {
466
        throw new IOException("Caught an IOException checking for serializers." +
463
        throw new IOException(String.format(msg, "deserializer", "Value"));
467
          " Ensure that the property: " +

   
468
          "'" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "'" +

   
469
          "is properly setup if you're using custom serializers in the Job.", e);

   
470
      }
464
      }
471
    }
465
    }
472
  }
466
  }
473

    
   
467

   
474
  private void writeConf(Configuration conf, Path jobFile) 
468
  private void writeConf(Configuration conf, Path jobFile) 
[+20] [20] 167 lines
src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java
Diff Revision 1 Diff Revision 3
 
  1. src/java/org/apache/hadoop/mapreduce/JobSubmitter.java: Loading...
  2. src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java: Loading...