Review Board 1.7.22


MAPREDUCE-2324 Job should fail if a reduce task can't be scheduled anywhere

Review Request #1164 - Created July 21, 2011 and updated

Robert Evans
MAPREDUCE-2324
Reviewers
hadoop-mapreduce
naisbitt, tgraves, tlipcon
hadoop-common
Job should fail if a reduce task can't be scheduled anywhere. V2 of the patch.
Unit tests and ran manual tests on a single node cluster.
branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Revision 1148035 New Change
[20] 113 lines
[+20] [+] static class KillInterruptedException extends InterruptedException {
114
  int failedReduceTasks = 0;
114
  int failedReduceTasks = 0;
115
  private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;
115
  private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;
116
  long reduce_input_limit = -1L;
116
  long reduce_input_limit = -1L;
117
  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
117
  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
118
  int completedMapsForReduceSlowstart = 0;
118
  int completedMapsForReduceSlowstart = 0;

    
   
119
  private static float DEFAULT_REDUCE_INPUT_ATTEMPT_FACTOR = 1.0f;

    
   
120
  float reduceInputAttemptFactor = DEFAULT_REDUCE_INPUT_ATTEMPT_FACTOR;

    
   
121
  HashSet<String> failedReduceInputAttempts = new HashSet<String>();
119
    
122
    
120
  // runningMapTasks include speculative tasks, so we need to capture 
123
  // runningMapTasks include speculative tasks, so we need to capture 
121
  // speculative tasks separately 
124
  // speculative tasks separately 
122
  int speculativeMapTasks = 0;
125
  int speculativeMapTasks = 0;
123
  int speculativeReduceTasks = 0;
126
  int speculativeReduceTasks = 0;
[+20] [20] 350 lines
[+20] [+] public FileSystem run() throws IOException {
474
      this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
477
      this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
475
      this.runningReduces = new LinkedHashSet<TaskInProgress>();
478
      this.runningReduces = new LinkedHashSet<TaskInProgress>();
476
      this.resourceEstimator = new ResourceEstimator(this);
479
      this.resourceEstimator = new ResourceEstimator(this);
477
      this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit", 
480
      this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit", 
478
          DEFAULT_REDUCE_INPUT_LIMIT);
481
          DEFAULT_REDUCE_INPUT_LIMIT);

    
   
482
      this.reduceInputAttemptFactor = conf.getFloat("mapreduce.reduce.input.limit.attempt.factor", 

    
   
483
          DEFAULT_REDUCE_INPUT_ATTEMPT_FACTOR);
479
      // register job's tokens for renewal
484
      // register job's tokens for renewal
480
      DelegationTokenRenewal.registerDelegationTokensForRenewal(
485
      DelegationTokenRenewal.registerDelegationTokensForRenewal(
481
          jobInfo.getJobID(), ts, jobtracker.getConf());
486
          jobInfo.getJobID(), ts, jobtracker.getConf());
482
      
487
      
483
      // Check task limits
488
      // Check task limits
[+20] [20] 1955 lines
[+20] [+] private synchronized int findNewReduceTask(TaskTrackerStatus tts,
2439
    }
2444
    }
2440

    
   
2445

   
2441
    long outSize = resourceEstimator.getEstimatedReduceInputSize();
2446
    long outSize = resourceEstimator.getEstimatedReduceInputSize();
2442
    long availSpace = tts.getResourceStatus().getAvailableSpace();
2447
    long availSpace = tts.getResourceStatus().getAvailableSpace();
2443
    if(availSpace < outSize) {
2448
    if(availSpace < outSize) {
2444
      LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
2449
      failedReduceInputAttempts.add(tts.host);
2445
                availSpace + 
2450
      int attemptLimit = (int)reduceInputAttemptFactor * numUniqueHosts;

    
   
2451
      int attempts = failedReduceInputAttempts.size();

    
   
2452
      if(attempts >= attemptLimit) {

    
   
2453
        // make sure jobtracker lock is held

    
   
2454
        LOG.info("We tried to schedule a reduce task on " +

    
   
2455
            attempts + " unique nodes with a limit of " + 

    
   
2456
            attemptLimit + ". Failing Job "+ jobId);

    
   
2457
        status.setFailureInfo("Job exceeded Reduce scheduling limit because" +

    
   
2458
        		" of not enough disk space. Limit: " + attemptLimit + 

    
   
2459
            " nodes tried: " + attempts+" needed space: "+

    
   
2460
            outSize);

    
   
2461
        jobtracker.failJob(this);

    
   
2462
      } else {

    
   
2463
        LOG.warn("No room for reduce task "+jobId+". Node " + taskTracker +

    
   
2464
            " has " + availSpace +
2446
               " bytes free; but we expect reduce input to take " + outSize);
2465
            " bytes free; but we expect reduce input to take " + outSize);
2447

    
   
2466
      }
2448
      return -1; //see if a different TIP might work better. 
2467
      return -1; //see if a different TIP might work better. 
2449
    }
2468
    }
2450
    
2469
    
2451
    // 1. check for a never-executed reduce tip
2470
    // 1. check for a never-executed reduce tip
2452
    // reducers don't have a cache and so pass -1 to explicitly call that out
2471
    // reducers don't have a cache and so pass -1 to explicitly call that out
2453
    tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
2472
    tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
2454
    if (tip != null) {
2473
    if (tip != null) {
2455
      scheduleReduce(tip);
2474
      scheduleReduce(tip);

    
   
2475
      failedReduceInputAttempts.clear();
2456
      return tip.getIdWithinJob();
2476
      return tip.getIdWithinJob();
2457
    }
2477
    }
2458

    
   
2478

   
2459
    // 2. check for a reduce tip to be speculated
2479
    // 2. check for a reduce tip to be speculated
2460
    if (hasSpeculativeReduces) {
2480
    if (hasSpeculativeReduces) {
2461
      tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
2481
      tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
2462
                                jobtracker.getClock().getTime(), false);
2482
                                jobtracker.getClock().getTime(), false);
2463
      if (tip != null) {
2483
      if (tip != null) {
2464
        scheduleReduce(tip);
2484
        scheduleReduce(tip);

    
   
2485
        failedReduceInputAttempts.clear();
2465
        return tip.getIdWithinJob();
2486
        return tip.getIdWithinJob();
2466
      }
2487
      }
2467
    }
2488
    }
2468

    
   
2489

   
2469
    return -1;
2490
    return -1;
[+20] [20] 1041 lines
branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Revision 1148035 New Change
 
branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Revision 1148035 New Change
 
branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
Revision 1148035 New Change
 
  1. branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java: Loading...
  2. branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java: Loading...
  3. branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java: Loading...
  4. branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskLimits.java: Loading...