Review Board 1.7.22


MAPREDUCE-2584. Check for serializers early, and give out more information regarding missing serializers.

Review Request #885 - Created June 11, 2011 and submitted

Harsh J
trunk
MAPREDUCE-2584
Reviewers
hadoop-mapreduce
hadoop-mapreduce-git
As discussed on HADOOP-7328, MapReduce can handle serializers in a much better way in case of bad configuration, improper imports (Some odd Text class instead of the Writable Text set as key), etc..

This issue covers the MapReduce parts of the improvements (made to MapOutputBuffer and possible early-check of serializer availability pre-submit) that provide more information than just an NPE as is the current case.
Added a test case that expects a failure if no io.serializers are present.

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 4. See what's changed.

1 2 3 4
1 2 3 4

  1. src/java/org/apache/hadoop/mapred/MapTask.java: Loading...
  2. src/java/org/apache/hadoop/mapreduce/JobSubmitter.java: Loading...
  3. src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java: Loading...
src/java/org/apache/hadoop/mapred/MapTask.java
Revision 21599c2 New Change
1
/**
1
/**
2
 * Licensed to the Apache Software Foundation (ASF) under one
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
8
 * with the License.  You may obtain a copy of the License at
9
 *
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18

    
   
18

   
19
package org.apache.hadoop.mapred;
19
package org.apache.hadoop.mapred;
20

    
   
20

   
21
import java.io.DataInput;
21
import java.io.DataInput;
22
import java.io.DataOutput;
22
import java.io.DataOutput;
23
import java.io.DataOutputStream;
23
import java.io.DataOutputStream;
24
import java.io.IOException;
24
import java.io.IOException;
25
import java.io.OutputStream;
25
import java.io.OutputStream;
26
import java.nio.ByteBuffer;
26
import java.nio.ByteBuffer;
27
import java.nio.IntBuffer;
27
import java.nio.IntBuffer;
28
import java.util.ArrayList;
28
import java.util.ArrayList;
29
import java.util.List;
29
import java.util.List;
30
import java.util.concurrent.locks.Condition;
30
import java.util.concurrent.locks.Condition;
31
import java.util.concurrent.locks.ReentrantLock;
31
import java.util.concurrent.locks.ReentrantLock;
32

    
   
32

   
33
import org.apache.commons.logging.Log;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
34
import org.apache.commons.logging.LogFactory;
35
import org.apache.hadoop.fs.FSDataInputStream;
35
import org.apache.hadoop.fs.FSDataInputStream;
36
import org.apache.hadoop.fs.FSDataOutputStream;
36
import org.apache.hadoop.fs.FSDataOutputStream;
37
import org.apache.hadoop.fs.FileSystem;
37
import org.apache.hadoop.fs.FileSystem;
38
import org.apache.hadoop.fs.LocalDirAllocator;
38
import org.apache.hadoop.fs.LocalDirAllocator;
39
import org.apache.hadoop.fs.LocalFileSystem;
39
import org.apache.hadoop.fs.LocalFileSystem;
40
import org.apache.hadoop.fs.Path;
40
import org.apache.hadoop.fs.Path;
41
import org.apache.hadoop.io.DataInputBuffer;
41
import org.apache.hadoop.io.DataInputBuffer;
42
import org.apache.hadoop.io.RawComparator;
42
import org.apache.hadoop.io.RawComparator;
43
import org.apache.hadoop.io.SequenceFile;
43
import org.apache.hadoop.io.SequenceFile;
44
import org.apache.hadoop.io.SequenceFile.CompressionType;
44
import org.apache.hadoop.io.SequenceFile.CompressionType;
45
import org.apache.hadoop.io.Text;
45
import org.apache.hadoop.io.Text;
46
import org.apache.hadoop.io.compress.CompressionCodec;
46
import org.apache.hadoop.io.compress.CompressionCodec;
47
import org.apache.hadoop.io.compress.DefaultCodec;
47
import org.apache.hadoop.io.compress.DefaultCodec;
48
import org.apache.hadoop.io.serializer.Deserializer;
48
import org.apache.hadoop.io.serializer.Deserializer;
49
import org.apache.hadoop.io.serializer.SerializationFactory;
49
import org.apache.hadoop.io.serializer.SerializationFactory;
50
import org.apache.hadoop.io.serializer.Serializer;
50
import org.apache.hadoop.io.serializer.Serializer;
51
import org.apache.hadoop.mapred.IFile.Writer;
51
import org.apache.hadoop.mapred.IFile.Writer;
52
import org.apache.hadoop.mapred.Merger.Segment;
52
import org.apache.hadoop.mapred.Merger.Segment;
53
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
53
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
54
import org.apache.hadoop.mapreduce.MRConfig;
54
import org.apache.hadoop.mapreduce.MRConfig;
55
import org.apache.hadoop.mapreduce.MRJobConfig;
55
import org.apache.hadoop.mapreduce.MRJobConfig;
56
import org.apache.hadoop.mapreduce.TaskAttemptContext;
56
import org.apache.hadoop.mapreduce.TaskAttemptContext;
57
import org.apache.hadoop.mapreduce.TaskCounter;
57
import org.apache.hadoop.mapreduce.TaskCounter;
58
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
58
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
59
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
59
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
60
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
60
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
61
import org.apache.hadoop.mapreduce.task.MapContextImpl;
61
import org.apache.hadoop.mapreduce.task.MapContextImpl;
62
import org.apache.hadoop.util.IndexedSortable;
62
import org.apache.hadoop.util.IndexedSortable;
63
import org.apache.hadoop.util.IndexedSorter;
63
import org.apache.hadoop.util.IndexedSorter;
64
import org.apache.hadoop.util.Progress;
64
import org.apache.hadoop.util.Progress;
65
import org.apache.hadoop.util.QuickSort;
65
import org.apache.hadoop.util.QuickSort;
66
import org.apache.hadoop.util.ReflectionUtils;
66
import org.apache.hadoop.util.ReflectionUtils;
67
import org.apache.hadoop.util.StringUtils;
67
import org.apache.hadoop.util.StringUtils;
68

    
   
68

   
69
/** A Map task. */
69
/** A Map task. */
70
class MapTask extends Task {
70
class MapTask extends Task {
71
  /**
71
  /**
72
   * The size of each record in the index file for the map-outputs.
72
   * The size of each record in the index file for the map-outputs.
73
   */
73
   */
74
  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
74
  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
75

    
   
75

   
76
  private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
76
  private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
77
  private final static int APPROX_HEADER_LENGTH = 150;
77
  private final static int APPROX_HEADER_LENGTH = 150;
78

    
   
78

   
79
  private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
79
  private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
80

    
   
80

   
81
  private Progress mapPhase;
81
  private Progress mapPhase;
82
  private Progress sortPhase;
82
  private Progress sortPhase;
83
  
83
  
84
  {   // set phase for this task
84
  {   // set phase for this task
85
    setPhase(TaskStatus.Phase.MAP); 
85
    setPhase(TaskStatus.Phase.MAP); 
86
    getProgress().setStatus("map");
86
    getProgress().setStatus("map");
87
  }
87
  }
88

    
   
88

   
89
  public MapTask() {
89
  public MapTask() {
90
    super();
90
    super();
91
  }
91
  }
92

    
   
92

   
93
  public MapTask(String jobFile, TaskAttemptID taskId, 
93
  public MapTask(String jobFile, TaskAttemptID taskId, 
94
                 int partition, TaskSplitIndex splitIndex,
94
                 int partition, TaskSplitIndex splitIndex,
95
                 int numSlotsRequired) {
95
                 int numSlotsRequired) {
96
    super(jobFile, taskId, partition, numSlotsRequired);
96
    super(jobFile, taskId, partition, numSlotsRequired);
97
    this.splitMetaInfo = splitIndex;
97
    this.splitMetaInfo = splitIndex;
98
  }
98
  }
99

    
   
99

   
100
  @Override
100
  @Override
101
  public boolean isMapTask() {
101
  public boolean isMapTask() {
102
    return true;
102
    return true;
103
  }
103
  }
104

    
   
104

   
105
  @Override
105
  @Override
106
  public void localizeConfiguration(JobConf conf)
106
  public void localizeConfiguration(JobConf conf)
107
      throws IOException {
107
      throws IOException {
108
    super.localizeConfiguration(conf);
108
    super.localizeConfiguration(conf);
109
    // split.dta/split.info files are used only by IsolationRunner.
109
    // split.dta/split.info files are used only by IsolationRunner.
110
    // Write the split file to the local disk if it is a normal map task (not a
110
    // Write the split file to the local disk if it is a normal map task (not a
111
    // job-setup or a job-cleanup task) and if the user wishes to run
111
    // job-setup or a job-cleanup task) and if the user wishes to run
112
    // IsolationRunner either by setting keep.failed.tasks.files to true or by
112
    // IsolationRunner either by setting keep.failed.tasks.files to true or by
113
    // using keep.tasks.files.pattern
113
    // using keep.tasks.files.pattern
114
    if (supportIsolationRunner(conf) && isMapOrReduce()) {
114
    if (supportIsolationRunner(conf) && isMapOrReduce()) {
115
      // localize the split meta-information 
115
      // localize the split meta-information 
116
      Path localSplitMeta =
116
      Path localSplitMeta =
117
          new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
117
          new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
118
              TaskTracker.getLocalSplitMetaFile(conf.getUser(), 
118
              TaskTracker.getLocalSplitMetaFile(conf.getUser(), 
119
                getJobID().toString(), getTaskID()
119
                getJobID().toString(), getTaskID()
120
                  .toString()), conf);
120
                  .toString()), conf);
121
      LOG.debug("Writing local split to " + localSplitMeta);
121
      LOG.debug("Writing local split to " + localSplitMeta);
122
      DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
122
      DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
123
      splitMetaInfo.write(out);
123
      splitMetaInfo.write(out);
124
      out.close();
124
      out.close();
125
    }
125
    }
126
  }
126
  }
127
  
127
  
128
  
128
  
129
  @Override
129
  @Override
130
  public TaskRunner createRunner(TaskTracker tracker, 
130
  public TaskRunner createRunner(TaskTracker tracker, 
131
      TaskTracker.TaskInProgress tip) {
131
      TaskTracker.TaskInProgress tip) {
132
    return new MapTaskRunner(tip, tracker, this.conf);
132
    return new MapTaskRunner(tip, tracker, this.conf);
133
  }
133
  }
134

    
   
134

   
135
  @Override
135
  @Override
136
  public void write(DataOutput out) throws IOException {
136
  public void write(DataOutput out) throws IOException {
137
    super.write(out);
137
    super.write(out);
138
    if (isMapOrReduce()) {
138
    if (isMapOrReduce()) {
139
      splitMetaInfo.write(out);
139
      splitMetaInfo.write(out);
140
      splitMetaInfo = null;
140
      splitMetaInfo = null;
141
    }
141
    }
142
  }
142
  }
143
  
143
  
144
  @Override
144
  @Override
145
  public void readFields(DataInput in) throws IOException {
145
  public void readFields(DataInput in) throws IOException {
146
    super.readFields(in);
146
    super.readFields(in);
147
    if (isMapOrReduce()) {
147
    if (isMapOrReduce()) {
148
      splitMetaInfo.readFields(in);
148
      splitMetaInfo.readFields(in);
149
    }
149
    }
150
  }
150
  }
151

    
   
151

   
152
  /**
152
  /**
153
   * This class wraps the user's record reader to update the counters and progress
153
   * This class wraps the user's record reader to update the counters and progress
154
   * as records are read.
154
   * as records are read.
155
   * @param <K>
155
   * @param <K>
156
   * @param <V>
156
   * @param <V>
157
   */
157
   */
158
  class TrackedRecordReader<K, V> 
158
  class TrackedRecordReader<K, V> 
159
      implements RecordReader<K,V> {
159
      implements RecordReader<K,V> {
160
    private RecordReader<K,V> rawIn;
160
    private RecordReader<K,V> rawIn;
161
    private Counters.Counter inputByteCounter;
161
    private Counters.Counter inputByteCounter;
162
    private Counters.Counter inputRecordCounter;
162
    private Counters.Counter inputRecordCounter;
163
    private TaskReporter reporter;
163
    private TaskReporter reporter;
164
    private long beforePos = -1;
164
    private long beforePos = -1;
165
    private long afterPos = -1;
165
    private long afterPos = -1;
166
    
166
    
167
    TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter) 
167
    TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter) 
168
      throws IOException{
168
      throws IOException{
169
      rawIn = raw;
169
      rawIn = raw;
170
      inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
170
      inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
171
      inputByteCounter = reporter.getCounter(
171
      inputByteCounter = reporter.getCounter(
172
                           FileInputFormat.COUNTER_GROUP,
172
                           FileInputFormat.COUNTER_GROUP,
173
                           FileInputFormat.BYTES_READ);
173
                           FileInputFormat.BYTES_READ);
174
      this.reporter = reporter;
174
      this.reporter = reporter;
175
    }
175
    }
176

    
   
176

   
177
    public K createKey() {
177
    public K createKey() {
178
      return rawIn.createKey();
178
      return rawIn.createKey();
179
    }
179
    }
180
      
180
      
181
    public V createValue() {
181
    public V createValue() {
182
      return rawIn.createValue();
182
      return rawIn.createValue();
183
    }
183
    }
184
     
184
     
185
    public synchronized boolean next(K key, V value)
185
    public synchronized boolean next(K key, V value)
186
    throws IOException {
186
    throws IOException {
187
      boolean ret = moveToNext(key, value);
187
      boolean ret = moveToNext(key, value);
188
      if (ret) {
188
      if (ret) {
189
        incrCounters();
189
        incrCounters();
190
      }
190
      }
191
      return ret;
191
      return ret;
192
    }
192
    }
193
    
193
    
194
    protected void incrCounters() {
194
    protected void incrCounters() {
195
      inputRecordCounter.increment(1);
195
      inputRecordCounter.increment(1);
196
      inputByteCounter.increment(afterPos - beforePos);
196
      inputByteCounter.increment(afterPos - beforePos);
197
    }
197
    }
198
     
198
     
199
    protected synchronized boolean moveToNext(K key, V value)
199
    protected synchronized boolean moveToNext(K key, V value)
200
      throws IOException {
200
      throws IOException {
201
      beforePos = getPos();
201
      beforePos = getPos();
202
      boolean ret = rawIn.next(key, value);
202
      boolean ret = rawIn.next(key, value);
203
      afterPos = getPos();
203
      afterPos = getPos();
204
      reporter.setProgress(getProgress());
204
      reporter.setProgress(getProgress());
205
      return ret;
205
      return ret;
206
    }
206
    }
207
    
207
    
208
    public long getPos() throws IOException { return rawIn.getPos(); }
208
    public long getPos() throws IOException { return rawIn.getPos(); }
209
    public void close() throws IOException { rawIn.close(); }
209
    public void close() throws IOException { rawIn.close(); }
210
    public float getProgress() throws IOException {
210
    public float getProgress() throws IOException {
211
      return rawIn.getProgress();
211
      return rawIn.getProgress();
212
    }
212
    }
213
    TaskReporter getTaskReporter() {
213
    TaskReporter getTaskReporter() {
214
      return reporter;
214
      return reporter;
215
    }
215
    }
216
  }
216
  }
217

    
   
217

   
218
  /**
218
  /**
219
   * This class skips the records based on the failed ranges from previous 
219
   * This class skips the records based on the failed ranges from previous 
220
   * attempts.
220
   * attempts.
221
   */
221
   */
222
  class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
222
  class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
223
    private SkipRangeIterator skipIt;
223
    private SkipRangeIterator skipIt;
224
    private SequenceFile.Writer skipWriter;
224
    private SequenceFile.Writer skipWriter;
225
    private boolean toWriteSkipRecs;
225
    private boolean toWriteSkipRecs;
226
    private TaskUmbilicalProtocol umbilical;
226
    private TaskUmbilicalProtocol umbilical;
227
    private Counters.Counter skipRecCounter;
227
    private Counters.Counter skipRecCounter;
228
    private long recIndex = -1;
228
    private long recIndex = -1;
229
    
229
    
230
    SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
230
    SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
231
                         TaskReporter reporter) throws IOException{
231
                         TaskReporter reporter) throws IOException{
232
      super(raw, reporter);
232
      super(raw, reporter);
233
      this.umbilical = umbilical;
233
      this.umbilical = umbilical;
234
      this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS);
234
      this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS);
235
      this.toWriteSkipRecs = toWriteSkipRecs() &&  
235
      this.toWriteSkipRecs = toWriteSkipRecs() &&  
236
        SkipBadRecords.getSkipOutputPath(conf)!=null;
236
        SkipBadRecords.getSkipOutputPath(conf)!=null;
237
      skipIt = getSkipRanges().skipRangeIterator();
237
      skipIt = getSkipRanges().skipRangeIterator();
238
    }
238
    }
239
    
239
    
240
    public synchronized boolean next(K key, V value)
240
    public synchronized boolean next(K key, V value)
241
    throws IOException {
241
    throws IOException {
242
      if(!skipIt.hasNext()) {
242
      if(!skipIt.hasNext()) {
243
        LOG.warn("Further records got skipped.");
243
        LOG.warn("Further records got skipped.");
244
        return false;
244
        return false;
245
      }
245
      }
246
      boolean ret = moveToNext(key, value);
246
      boolean ret = moveToNext(key, value);
247
      long nextRecIndex = skipIt.next();
247
      long nextRecIndex = skipIt.next();
248
      long skip = 0;
248
      long skip = 0;
249
      while(recIndex<nextRecIndex && ret) {
249
      while(recIndex<nextRecIndex && ret) {
250
        if(toWriteSkipRecs) {
250
        if(toWriteSkipRecs) {
251
          writeSkippedRec(key, value);
251
          writeSkippedRec(key, value);
252
        }
252
        }
253
      	ret = moveToNext(key, value);
253
      	ret = moveToNext(key, value);
254
        skip++;
254
        skip++;
255
      }
255
      }
256
      //close the skip writer once all the ranges are skipped
256
      //close the skip writer once all the ranges are skipped
257
      if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
257
      if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
258
        skipWriter.close();
258
        skipWriter.close();
259
      }
259
      }
260
      skipRecCounter.increment(skip);
260
      skipRecCounter.increment(skip);
261
      reportNextRecordRange(umbilical, recIndex);
261
      reportNextRecordRange(umbilical, recIndex);
262
      if (ret) {
262
      if (ret) {
263
        incrCounters();
263
        incrCounters();
264
      }
264
      }
265
      return ret;
265
      return ret;
266
    }
266
    }
267
    
267
    
268
    protected synchronized boolean moveToNext(K key, V value)
268
    protected synchronized boolean moveToNext(K key, V value)
269
    throws IOException {
269
    throws IOException {
270
	    recIndex++;
270
	    recIndex++;
271
      return super.moveToNext(key, value);
271
      return super.moveToNext(key, value);
272
    }
272
    }
273
    
273
    
274
    @SuppressWarnings("unchecked")
274
    @SuppressWarnings("unchecked")
275
    private void writeSkippedRec(K key, V value) throws IOException{
275
    private void writeSkippedRec(K key, V value) throws IOException{
276
      if(skipWriter==null) {
276
      if(skipWriter==null) {
277
        Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
277
        Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
278
        Path skipFile = new Path(skipDir, getTaskID().toString());
278
        Path skipFile = new Path(skipDir, getTaskID().toString());
279
        skipWriter = 
279
        skipWriter = 
280
          SequenceFile.createWriter(
280
          SequenceFile.createWriter(
281
              skipFile.getFileSystem(conf), conf, skipFile,
281
              skipFile.getFileSystem(conf), conf, skipFile,
282
              (Class<K>) createKey().getClass(),
282
              (Class<K>) createKey().getClass(),
283
              (Class<V>) createValue().getClass(), 
283
              (Class<V>) createValue().getClass(), 
284
              CompressionType.BLOCK, getTaskReporter());
284
              CompressionType.BLOCK, getTaskReporter());
285
      }
285
      }
286
      skipWriter.append(key, value);
286
      skipWriter.append(key, value);
287
    }
287
    }
288
  }
288
  }
289

    
   
289

   
290
  @Override
290
  @Override
291
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
291
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
292
    throws IOException, ClassNotFoundException, InterruptedException {
292
    throws IOException, ClassNotFoundException, InterruptedException {
293
    this.umbilical = umbilical;
293
    this.umbilical = umbilical;
294

    
   
294

   
295
    if (isMapTask()) {
295
    if (isMapTask()) {
296
      // If there are no reducers then there won't be any sort. Hence the map 
296
      // If there are no reducers then there won't be any sort. Hence the map 
297
      // phase will govern the entire attempt's progress.
297
      // phase will govern the entire attempt's progress.
298
      if (conf.getNumReduceTasks() == 0) {
298
      if (conf.getNumReduceTasks() == 0) {
299
        mapPhase = getProgress().addPhase("map", 1.0f);
299
        mapPhase = getProgress().addPhase("map", 1.0f);
300
      } else {
300
      } else {
301
        // If there are reducers then the entire attempt's progress will be 
301
        // If there are reducers then the entire attempt's progress will be 
302
        // split between the map phase (67%) and the sort phase (33%).
302
        // split between the map phase (67%) and the sort phase (33%).
303
        mapPhase = getProgress().addPhase("map", 0.667f);
303
        mapPhase = getProgress().addPhase("map", 0.667f);
304
        sortPhase  = getProgress().addPhase("sort", 0.333f);
304
        sortPhase  = getProgress().addPhase("sort", 0.333f);
305
      }
305
      }
306
    }
306
    }
307
    TaskReporter reporter = startReporter(umbilical);
307
    TaskReporter reporter = startReporter(umbilical);
308
 
308
 
309
    boolean useNewApi = job.getUseNewMapper();
309
    boolean useNewApi = job.getUseNewMapper();
310
    initialize(job, getJobID(), reporter, useNewApi);
310
    initialize(job, getJobID(), reporter, useNewApi);
311

    
   
311

   
312
    // check if it is a cleanupJobTask
312
    // check if it is a cleanupJobTask
313
    if (jobCleanup) {
313
    if (jobCleanup) {
314
      runJobCleanupTask(umbilical, reporter);
314
      runJobCleanupTask(umbilical, reporter);
315
      return;
315
      return;
316
    }
316
    }
317
    if (jobSetup) {
317
    if (jobSetup) {
318
      runJobSetupTask(umbilical, reporter);
318
      runJobSetupTask(umbilical, reporter);
319
      return;
319
      return;
320
    }
320
    }
321
    if (taskCleanup) {
321
    if (taskCleanup) {
322
      runTaskCleanupTask(umbilical, reporter);
322
      runTaskCleanupTask(umbilical, reporter);
323
      return;
323
      return;
324
    }
324
    }
325

    
   
325

   
326
    if (useNewApi) {
326
    if (useNewApi) {
327
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
327
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
328
    } else {
328
    } else {
329
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
329
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
330
    }
330
    }
331
    done(umbilical, reporter);
331
    done(umbilical, reporter);
332
  }
332
  }
333

    
   
333

   
334
 @SuppressWarnings("unchecked")
334
 @SuppressWarnings("unchecked")
335
 private <T> T getSplitDetails(Path file, long offset) 
335
 private <T> T getSplitDetails(Path file, long offset) 
336
  throws IOException {
336
  throws IOException {
337
   FileSystem fs = file.getFileSystem(conf);
337
   FileSystem fs = file.getFileSystem(conf);
338
   FSDataInputStream inFile = fs.open(file);
338
   FSDataInputStream inFile = fs.open(file);
339
   inFile.seek(offset);
339
   inFile.seek(offset);
340
   String className = Text.readString(inFile);
340
   String className = Text.readString(inFile);
341
   Class<T> cls;
341
   Class<T> cls;
342
   try {
342
   try {
343
     cls = (Class<T>) conf.getClassByName(className);
343
     cls = (Class<T>) conf.getClassByName(className);
344
   } catch (ClassNotFoundException ce) {
344
   } catch (ClassNotFoundException ce) {
345
     IOException wrap = new IOException("Split class " + className + 
345
     IOException wrap = new IOException("Split class " + className + 
346
                                         " not found");
346
                                         " not found");
347
     wrap.initCause(ce);
347
     wrap.initCause(ce);
348
     throw wrap;
348
     throw wrap;
349
   }
349
   }
350
   SerializationFactory factory = new SerializationFactory(conf);
350
   SerializationFactory factory = new SerializationFactory(conf);
351
   Deserializer<T> deserializer = 
351
   Deserializer<T> deserializer = 
352
     (Deserializer<T>) factory.getDeserializer(cls);
352
     (Deserializer<T>) factory.getDeserializer(cls);
353
   deserializer.open(inFile);
353
   deserializer.open(inFile);
354
   T split = deserializer.deserialize(null);
354
   T split = deserializer.deserialize(null);
355
   long pos = inFile.getPos();
355
   long pos = inFile.getPos();
356
   getCounters().findCounter(
356
   getCounters().findCounter(
357
       TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
357
       TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
358
   inFile.close();
358
   inFile.close();
359
   return split;
359
   return split;
360
 }
360
 }
361
  
361
  
362
  @SuppressWarnings("unchecked")
362
  @SuppressWarnings("unchecked")
363
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
363
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
364
  void runOldMapper(final JobConf job,
364
  void runOldMapper(final JobConf job,
365
                    final TaskSplitIndex splitIndex,
365
                    final TaskSplitIndex splitIndex,
366
                    final TaskUmbilicalProtocol umbilical,
366
                    final TaskUmbilicalProtocol umbilical,
367
                    TaskReporter reporter
367
                    TaskReporter reporter
368
                    ) throws IOException, InterruptedException,
368
                    ) throws IOException, InterruptedException,
369
                             ClassNotFoundException {
369
                             ClassNotFoundException {
370
    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
370
    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
371
           splitIndex.getStartOffset());
371
           splitIndex.getStartOffset());
372

    
   
372

   
373
    updateJobWithSplit(job, inputSplit);
373
    updateJobWithSplit(job, inputSplit);
374
    reporter.setInputSplit(inputSplit);
374
    reporter.setInputSplit(inputSplit);
375

    
   
375

   
376
    RecordReader<INKEY,INVALUE> rawIn =                  // open input
376
    RecordReader<INKEY,INVALUE> rawIn =                  // open input
377
      job.getInputFormat().getRecordReader(inputSplit, job, reporter);
377
      job.getInputFormat().getRecordReader(inputSplit, job, reporter);
378
    RecordReader<INKEY,INVALUE> in = isSkipping() ? 
378
    RecordReader<INKEY,INVALUE> in = isSkipping() ? 
379
        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
379
        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
380
        new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
380
        new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
381
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
381
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
382

    
   
382

   
383

    
   
383

   
384
    int numReduceTasks = conf.getNumReduceTasks();
384
    int numReduceTasks = conf.getNumReduceTasks();
385
    LOG.info("numReduceTasks: " + numReduceTasks);
385
    LOG.info("numReduceTasks: " + numReduceTasks);
386
    MapOutputCollector collector = null;
386
    MapOutputCollector collector = null;
387
    if (numReduceTasks > 0) {
387
    if (numReduceTasks > 0) {
388
      collector = new MapOutputBuffer(umbilical, job, reporter);
388
      collector = new MapOutputBuffer(umbilical, job, reporter);
389
    } else { 
389
    } else { 
390
      collector = new DirectMapOutputCollector(umbilical, job, reporter);
390
      collector = new DirectMapOutputCollector(umbilical, job, reporter);
391
    }
391
    }
392
    MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
392
    MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
393
      ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
393
      ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
394

    
   
394

   
395
    try {
395
    try {
396
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
396
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
397
      mapPhase.complete();
397
      mapPhase.complete();
398
      // start the sort phase only if there are reducers
398
      // start the sort phase only if there are reducers
399
      if (numReduceTasks > 0) {
399
      if (numReduceTasks > 0) {
400
        setPhase(TaskStatus.Phase.SORT);
400
        setPhase(TaskStatus.Phase.SORT);
401
      }
401
      }
402
      statusUpdate(umbilical);
402
      statusUpdate(umbilical);
403
      collector.flush();
403
      collector.flush();
404
    } finally {
404
    } finally {
405
      //close
405
      //close
406
      in.close();                               // close input
406
      in.close();                               // close input
407
      collector.close();
407
      collector.close();
408
    }
408
    }
409
  }
409
  }
410

    
   
410

   
411
  /**
411
  /**
412
   * Update the job with details about the file split
412
   * Update the job with details about the file split
413
   * @param job the job configuration to update
413
   * @param job the job configuration to update
414
   * @param inputSplit the file split
414
   * @param inputSplit the file split
415
   */
415
   */
416
  private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
416
  private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
417
    if (inputSplit instanceof FileSplit) {
417
    if (inputSplit instanceof FileSplit) {
418
      FileSplit fileSplit = (FileSplit) inputSplit;
418
      FileSplit fileSplit = (FileSplit) inputSplit;
419
      job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
419
      job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
420
      job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
420
      job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
421
      job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
421
      job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
422
    }
422
    }
423
  }
423
  }
424

    
   
424

   
425
  static class NewTrackingRecordReader<K,V> 
425
  static class NewTrackingRecordReader<K,V> 
426
    extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
426
    extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
427
    private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
427
    private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
428
    private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
428
    private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
429
    private final TaskReporter reporter;
429
    private final TaskReporter reporter;
430
    
430
    
431
    NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
431
    NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
432
                            TaskReporter reporter) {
432
                            TaskReporter reporter) {
433
      this.real = real;
433
      this.real = real;
434
      this.reporter = reporter;
434
      this.reporter = reporter;
435
      this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
435
      this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
436
    }
436
    }
437

    
   
437

   
438
    @Override
438
    @Override
439
    public void close() throws IOException {
439
    public void close() throws IOException {
440
      real.close();
440
      real.close();
441
    }
441
    }
442

    
   
442

   
443
    @Override
443
    @Override
444
    public K getCurrentKey() throws IOException, InterruptedException {
444
    public K getCurrentKey() throws IOException, InterruptedException {
445
      return real.getCurrentKey();
445
      return real.getCurrentKey();
446
    }
446
    }
447

    
   
447

   
448
    @Override
448
    @Override
449
    public V getCurrentValue() throws IOException, InterruptedException {
449
    public V getCurrentValue() throws IOException, InterruptedException {
450
      return real.getCurrentValue();
450
      return real.getCurrentValue();
451
    }
451
    }
452

    
   
452

   
453
    @Override
453
    @Override
454
    public float getProgress() throws IOException, InterruptedException {
454
    public float getProgress() throws IOException, InterruptedException {
455
      return real.getProgress();
455
      return real.getProgress();
456
    }
456
    }
457

    
   
457

   
458
    @Override
458
    @Override
459
    public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
459
    public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
460
                           org.apache.hadoop.mapreduce.TaskAttemptContext context
460
                           org.apache.hadoop.mapreduce.TaskAttemptContext context
461
                           ) throws IOException, InterruptedException {
461
                           ) throws IOException, InterruptedException {
462
      real.initialize(split, context);
462
      real.initialize(split, context);
463
    }
463
    }
464

    
   
464

   
465
    @Override
465
    @Override
466
    public boolean nextKeyValue() throws IOException, InterruptedException {
466
    public boolean nextKeyValue() throws IOException, InterruptedException {
467
      boolean result = real.nextKeyValue();
467
      boolean result = real.nextKeyValue();
468
      if (result) {
468
      if (result) {
469
        inputRecordCounter.increment(1);
469
        inputRecordCounter.increment(1);
470
      }
470
      }
471
      reporter.setProgress(getProgress());
471
      reporter.setProgress(getProgress());
472
      return result;
472
      return result;
473
    }
473
    }
474
  }
474
  }
475

    
   
475

   
476
  /**
476
  /**
477
   * Since the mapred and mapreduce Partitioners don't share a common interface
477
   * Since the mapred and mapreduce Partitioners don't share a common interface
478
   * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
478
   * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
479
   * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
479
   * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
480
   * the configured partitioner should not be called. It's common for
480
   * the configured partitioner should not be called. It's common for
481
   * partitioners to compute a result mod numReduces, which causes a div0 error
481
   * partitioners to compute a result mod numReduces, which causes a div0 error
482
   */
482
   */
483
  private static class OldOutputCollector<K,V> implements OutputCollector<K,V> {
483
  private static class OldOutputCollector<K,V> implements OutputCollector<K,V> {
484
    private final Partitioner<K,V> partitioner;
484
    private final Partitioner<K,V> partitioner;
485
    private final MapOutputCollector<K,V> collector;
485
    private final MapOutputCollector<K,V> collector;
486
    private final int numPartitions;
486
    private final int numPartitions;
487

    
   
487

   
488
    @SuppressWarnings("unchecked")
488
    @SuppressWarnings("unchecked")
489
    OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
489
    OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
490
      numPartitions = conf.getNumReduceTasks();
490
      numPartitions = conf.getNumReduceTasks();
491
      if (numPartitions > 1) {
491
      if (numPartitions > 1) {
492
        partitioner = (Partitioner<K,V>)
492
        partitioner = (Partitioner<K,V>)
493
          ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
493
          ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
494
      } else {
494
      } else {
495
        partitioner = new Partitioner<K,V>() {
495
        partitioner = new Partitioner<K,V>() {
496
          @Override
496
          @Override
497
          public void configure(JobConf job) { }
497
          public void configure(JobConf job) { }
498
          @Override
498
          @Override
499
          public int getPartition(K key, V value, int numPartitions) {
499
          public int getPartition(K key, V value, int numPartitions) {
500
            return numPartitions - 1;
500
            return numPartitions - 1;
501
          }
501
          }
502
        };
502
        };
503
      }
503
      }
504
      this.collector = collector;
504
      this.collector = collector;
505
    }
505
    }
506

    
   
506

   
507
    @Override
507
    @Override
508
    public void collect(K key, V value) throws IOException {
508
    public void collect(K key, V value) throws IOException {
509
      try {
509
      try {
510
        collector.collect(key, value,
510
        collector.collect(key, value,
511
                          partitioner.getPartition(key, value, numPartitions));
511
                          partitioner.getPartition(key, value, numPartitions));
512
      } catch (InterruptedException ie) {
512
      } catch (InterruptedException ie) {
513
        Thread.currentThread().interrupt();
513
        Thread.currentThread().interrupt();
514
        throw new IOException("interrupt exception", ie);
514
        throw new IOException("interrupt exception", ie);
515
      }
515
      }
516
    }
516
    }
517
  }
517
  }
518

    
   
518

   
519
  private class NewDirectOutputCollector<K,V>
519
  private class NewDirectOutputCollector<K,V>
520
  extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
520
  extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
521
    private final org.apache.hadoop.mapreduce.RecordWriter out;
521
    private final org.apache.hadoop.mapreduce.RecordWriter out;
522

    
   
522

   
523
    private final TaskReporter reporter;
523
    private final TaskReporter reporter;
524

    
   
524

   
525
    private final Counters.Counter mapOutputRecordCounter;
525
    private final Counters.Counter mapOutputRecordCounter;
526
    
526
    
527
    @SuppressWarnings("unchecked")
527
    @SuppressWarnings("unchecked")
528
    NewDirectOutputCollector(MRJobConfig jobContext,
528
    NewDirectOutputCollector(MRJobConfig jobContext,
529
        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
529
        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
530
    throws IOException, ClassNotFoundException, InterruptedException {
530
    throws IOException, ClassNotFoundException, InterruptedException {
531
      this.reporter = reporter;
531
      this.reporter = reporter;
532
      out = outputFormat.getRecordWriter(taskContext);
532
      out = outputFormat.getRecordWriter(taskContext);
533
      mapOutputRecordCounter = 
533
      mapOutputRecordCounter = 
534
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
534
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
535
    }
535
    }
536

    
   
536

   
537
    @Override
537
    @Override
538
    @SuppressWarnings("unchecked")
538
    @SuppressWarnings("unchecked")
539
    public void write(K key, V value) 
539
    public void write(K key, V value) 
540
    throws IOException, InterruptedException {
540
    throws IOException, InterruptedException {
541
      reporter.progress();
541
      reporter.progress();
542
      out.write(key, value);
542
      out.write(key, value);
543
      mapOutputRecordCounter.increment(1);
543
      mapOutputRecordCounter.increment(1);
544
    }
544
    }
545

    
   
545

   
546
    @Override
546
    @Override
547
    public void close(TaskAttemptContext context) 
547
    public void close(TaskAttemptContext context) 
548
    throws IOException,InterruptedException {
548
    throws IOException,InterruptedException {
549
      reporter.progress();
549
      reporter.progress();
550
      if (out != null) {
550
      if (out != null) {
551
        out.close(context);
551
        out.close(context);
552
      }
552
      }
553
    }
553
    }
554
  }
554
  }
555
  
555
  
556
  private class NewOutputCollector<K,V>
556
  private class NewOutputCollector<K,V>
557
    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
557
    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
558
    private final MapOutputCollector<K,V> collector;
558
    private final MapOutputCollector<K,V> collector;
559
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
559
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
560
    private final int partitions;
560
    private final int partitions;
561

    
   
561

   
562
    @SuppressWarnings("unchecked")
562
    @SuppressWarnings("unchecked")
563
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
563
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
564
                       JobConf job,
564
                       JobConf job,
565
                       TaskUmbilicalProtocol umbilical,
565
                       TaskUmbilicalProtocol umbilical,
566
                       TaskReporter reporter
566
                       TaskReporter reporter
567
                       ) throws IOException, ClassNotFoundException {
567
                       ) throws IOException, ClassNotFoundException {
568
      collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
568
      collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
569
      partitions = jobContext.getNumReduceTasks();
569
      partitions = jobContext.getNumReduceTasks();
570
      if (partitions > 1) {
570
      if (partitions > 1) {
571
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
571
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
572
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
572
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
573
      } else {
573
      } else {
574
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
574
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
575
          @Override
575
          @Override
576
          public int getPartition(K key, V value, int numPartitions) {
576
          public int getPartition(K key, V value, int numPartitions) {
577
            return partitions - 1;
577
            return partitions - 1;
578
          }
578
          }
579
        };
579
        };
580
      }
580
      }
581
    }
581
    }
582

    
   
582

   
583
    @Override
583
    @Override
584
    public void write(K key, V value) throws IOException, InterruptedException {
584
    public void write(K key, V value) throws IOException, InterruptedException {
585
      collector.collect(key, value,
585
      collector.collect(key, value,
586
                        partitioner.getPartition(key, value, partitions));
586
                        partitioner.getPartition(key, value, partitions));
587
    }
587
    }
588

    
   
588

   
589
    @Override
589
    @Override
590
    public void close(TaskAttemptContext context
590
    public void close(TaskAttemptContext context
591
                      ) throws IOException,InterruptedException {
591
                      ) throws IOException,InterruptedException {
592
      try {
592
      try {
593
        collector.flush();
593
        collector.flush();
594
      } catch (ClassNotFoundException cnf) {
594
      } catch (ClassNotFoundException cnf) {
595
        throw new IOException("can't find class ", cnf);
595
        throw new IOException("can't find class ", cnf);
596
      }
596
      }
597
      collector.close();
597
      collector.close();
598
    }
598
    }
599
  }
599
  }
600

    
   
600

   
601
  @SuppressWarnings("unchecked")
601
  @SuppressWarnings("unchecked")
602
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
602
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
603
  void runNewMapper(final JobConf job,
603
  void runNewMapper(final JobConf job,
604
                    final TaskSplitIndex splitIndex,
604
                    final TaskSplitIndex splitIndex,
605
                    final TaskUmbilicalProtocol umbilical,
605
                    final TaskUmbilicalProtocol umbilical,
606
                    TaskReporter reporter
606
                    TaskReporter reporter
607
                    ) throws IOException, ClassNotFoundException,
607
                    ) throws IOException, ClassNotFoundException,
608
                             InterruptedException {
608
                             InterruptedException {
609
    // make a task context so we can get the classes
609
    // make a task context so we can get the classes
610
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
610
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
611
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
611
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
612
                                                                  getTaskID(),
612
                                                                  getTaskID(),
613
                                                                  reporter);
613
                                                                  reporter);
614
    // make a mapper
614
    // make a mapper
615
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
615
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
616
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
616
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
617
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
617
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
618
    // make the input format
618
    // make the input format
619
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
619
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
620
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
620
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
621
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
621
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
622
    // rebuild the input split
622
    // rebuild the input split
623
    org.apache.hadoop.mapreduce.InputSplit split = null;
623
    org.apache.hadoop.mapreduce.InputSplit split = null;
624
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
624
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
625
        splitIndex.getStartOffset());
625
        splitIndex.getStartOffset());
626

    
   
626

   
627
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
627
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
628
      new NewTrackingRecordReader<INKEY,INVALUE>
628
      new NewTrackingRecordReader<INKEY,INVALUE>
629
          (inputFormat.createRecordReader(split, taskContext), reporter);
629
          (inputFormat.createRecordReader(split, taskContext), reporter);
630
    
630
    
631
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
631
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
632
    org.apache.hadoop.mapreduce.RecordWriter output = null;
632
    org.apache.hadoop.mapreduce.RecordWriter output = null;
633
    
633
    
634
    // get an output object
634
    // get an output object
635
    if (job.getNumReduceTasks() == 0) {
635
    if (job.getNumReduceTasks() == 0) {
636
      output = 
636
      output = 
637
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
637
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
638
    } else {
638
    } else {
639
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
639
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
640
    }
640
    }
641

    
   
641

   
642
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
642
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
643
    mapContext = 
643
    mapContext = 
644
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
644
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
645
          input, output, 
645
          input, output, 
646
          committer, 
646
          committer, 
647
          reporter, split);
647
          reporter, split);
648

    
   
648

   
649
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
649
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
650
        mapperContext = 
650
        mapperContext = 
651
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
651
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
652
              mapContext);
652
              mapContext);
653

    
   
653

   
654
    input.initialize(split, mapperContext);
654
    input.initialize(split, mapperContext);
655
    mapper.run(mapperContext);
655
    mapper.run(mapperContext);
656
    mapPhase.complete();
656
    mapPhase.complete();
657
    setPhase(TaskStatus.Phase.SORT);
657
    setPhase(TaskStatus.Phase.SORT);
658
    statusUpdate(umbilical);
658
    statusUpdate(umbilical);
659
    input.close();
659
    input.close();
660
    output.close(mapperContext);
660
    output.close(mapperContext);
661
  }
661
  }
662

    
   
662

   
663
  interface MapOutputCollector<K, V> {
663
  interface MapOutputCollector<K, V> {
664

    
   
664

   
665
    public void collect(K key, V value, int partition
665
    public void collect(K key, V value, int partition
666
                        ) throws IOException, InterruptedException;
666
                        ) throws IOException, InterruptedException;
667
    public void close() throws IOException, InterruptedException;
667
    public void close() throws IOException, InterruptedException;
668
    
668
    
669
    public void flush() throws IOException, InterruptedException, 
669
    public void flush() throws IOException, InterruptedException, 
670
                               ClassNotFoundException;
670
                               ClassNotFoundException;
671
        
671
        
672
  }
672
  }
673

    
   
673

   
674
  class DirectMapOutputCollector<K, V>
674
  class DirectMapOutputCollector<K, V>
675
    implements MapOutputCollector<K, V> {
675
    implements MapOutputCollector<K, V> {
676
 
676
 
677
    private RecordWriter<K, V> out = null;
677
    private RecordWriter<K, V> out = null;
678

    
   
678

   
679
    private TaskReporter reporter = null;
679
    private TaskReporter reporter = null;
680

    
   
680

   
681
    private final Counters.Counter mapOutputRecordCounter;
681
    private final Counters.Counter mapOutputRecordCounter;
682

    
   
682

   
683
    @SuppressWarnings("unchecked")
683
    @SuppressWarnings("unchecked")
684
    public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
684
    public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
685
        JobConf job, TaskReporter reporter) throws IOException {
685
        JobConf job, TaskReporter reporter) throws IOException {
686
      this.reporter = reporter;
686
      this.reporter = reporter;
687
      String finalName = getOutputName(getPartition());
687
      String finalName = getOutputName(getPartition());
688
      FileSystem fs = FileSystem.get(job);
688
      FileSystem fs = FileSystem.get(job);
689

    
   
689

   
690
      out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
690
      out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
691

    
   
691

   
692
      mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
692
      mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
693
    }
693
    }
694

    
   
694

   
695
    public void close() throws IOException {
695
    public void close() throws IOException {
696
      if (this.out != null) {
696
      if (this.out != null) {
697
        out.close(this.reporter);
697
        out.close(this.reporter);
698
      }
698
      }
699

    
   
699

   
700
    }
700
    }
701

    
   
701

   
702
    public void flush() throws IOException, InterruptedException, 
702
    public void flush() throws IOException, InterruptedException, 
703
                               ClassNotFoundException {
703
                               ClassNotFoundException {
704
    }
704
    }
705

    
   
705

   
706
    public void collect(K key, V value, int partition) throws IOException {
706
    public void collect(K key, V value, int partition) throws IOException {
707
      reporter.progress();
707
      reporter.progress();
708
      out.write(key, value);
708
      out.write(key, value);
709
      mapOutputRecordCounter.increment(1);
709
      mapOutputRecordCounter.increment(1);
710
    }
710
    }
711
    
711
    
712
  }
712
  }
713

    
   
713

   
714
  private class MapOutputBuffer<K extends Object, V extends Object>
714
  private class MapOutputBuffer<K extends Object, V extends Object>
715
      implements MapOutputCollector<K, V>, IndexedSortable {
715
      implements MapOutputCollector<K, V>, IndexedSortable {
716
    final int partitions;
716
    final int partitions;
717
    final JobConf job;
717
    final JobConf job;
718
    final TaskReporter reporter;
718
    final TaskReporter reporter;
719
    final Class<K> keyClass;
719
    final Class<K> keyClass;
720
    final Class<V> valClass;
720
    final Class<V> valClass;
721
    final RawComparator<K> comparator;
721
    final RawComparator<K> comparator;
722
    final SerializationFactory serializationFactory;
722
    final SerializationFactory serializationFactory;
723
    final Serializer<K> keySerializer;
723
    final Serializer<K> keySerializer;
724
    final Serializer<V> valSerializer;
724
    final Serializer<V> valSerializer;
725
    final CombinerRunner<K,V> combinerRunner;
725
    final CombinerRunner<K,V> combinerRunner;
726
    final CombineOutputCollector<K, V> combineCollector;
726
    final CombineOutputCollector<K, V> combineCollector;
727

    
   
727

   
728
    // Compression for map-outputs
728
    // Compression for map-outputs
729
    final CompressionCodec codec;
729
    final CompressionCodec codec;
730

    
   
730

   
731
    // k/v accounting
731
    // k/v accounting
732
    final IntBuffer kvmeta; // metadata overlay on backing store
732
    final IntBuffer kvmeta; // metadata overlay on backing store
733
    int kvstart;            // marks origin of spill metadata
733
    int kvstart;            // marks origin of spill metadata
734
    int kvend;              // marks end of spill metadata
734
    int kvend;              // marks end of spill metadata
735
    int kvindex;            // marks end of fully serialized records
735
    int kvindex;            // marks end of fully serialized records
736

    
   
736

   
737
    int equator;            // marks origin of meta/serialization
737
    int equator;            // marks origin of meta/serialization
738
    int bufstart;           // marks beginning of spill
738
    int bufstart;           // marks beginning of spill
739
    int bufend;             // marks beginning of collectable
739
    int bufend;             // marks beginning of collectable
740
    int bufmark;            // marks end of record
740
    int bufmark;            // marks end of record
741
    int bufindex;           // marks end of collected
741
    int bufindex;           // marks end of collected
742
    int bufvoid;            // marks the point where we should stop
742
    int bufvoid;            // marks the point where we should stop
743
                            // reading at the end of the buffer
743
                            // reading at the end of the buffer
744

    
   
744

   
745
    byte[] kvbuffer;        // main output buffer
745
    byte[] kvbuffer;        // main output buffer
746
    private final byte[] b0 = new byte[0];
746
    private final byte[] b0 = new byte[0];
747

    
   
747

   
748
    private static final int INDEX = 0;            // index offset in acct
748
    private static final int INDEX = 0;            // index offset in acct
749
    private static final int VALSTART = 1;         // val offset in acct
749
    private static final int VALSTART = 1;         // val offset in acct
750
    private static final int KEYSTART = 2;         // key offset in acct
750
    private static final int KEYSTART = 2;         // key offset in acct
751
    private static final int PARTITION = 3;        // partition offset in acct
751
    private static final int PARTITION = 3;        // partition offset in acct
752
    private static final int NMETA = 4;            // num meta ints
752
    private static final int NMETA = 4;            // num meta ints
753
    private static final int METASIZE = NMETA * 4; // size in bytes
753
    private static final int METASIZE = NMETA * 4; // size in bytes
754

    
   
754

   
755
    // spill accounting
755
    // spill accounting
756
    final int maxRec;
756
    final int maxRec;
757
    final int softLimit;
757
    final int softLimit;
758
    boolean spillInProgress;;
758
    boolean spillInProgress;;
759
    int bufferRemaining;
759
    int bufferRemaining;
760
    volatile Throwable sortSpillException = null;
760
    volatile Throwable sortSpillException = null;
761

    
   
761

   
762
    int numSpills = 0;
762
    int numSpills = 0;
763
    final int minSpillsForCombine;
763
    final int minSpillsForCombine;
764
    final IndexedSorter sorter;
764
    final IndexedSorter sorter;
765
    final ReentrantLock spillLock = new ReentrantLock();
765
    final ReentrantLock spillLock = new ReentrantLock();
766
    final Condition spillDone = spillLock.newCondition();
766
    final Condition spillDone = spillLock.newCondition();
767
    final Condition spillReady = spillLock.newCondition();
767
    final Condition spillReady = spillLock.newCondition();
768
    final BlockingBuffer bb = new BlockingBuffer();
768
    final BlockingBuffer bb = new BlockingBuffer();
769
    volatile boolean spillThreadRunning = false;
769
    volatile boolean spillThreadRunning = false;
770
    final SpillThread spillThread = new SpillThread();
770
    final SpillThread spillThread = new SpillThread();
771

    
   
771

   
772
    final FileSystem rfs;
772
    final FileSystem rfs;
773

    
   
773

   
774
    // Counters
774
    // Counters
775
    final Counters.Counter mapOutputByteCounter;
775
    final Counters.Counter mapOutputByteCounter;
776
    final Counters.Counter mapOutputRecordCounter;
776
    final Counters.Counter mapOutputRecordCounter;
777

    
   
777

   
778
    final ArrayList<SpillRecord> indexCacheList =
778
    final ArrayList<SpillRecord> indexCacheList =
779
      new ArrayList<SpillRecord>();
779
      new ArrayList<SpillRecord>();
780
    private int totalIndexCacheMemory;
780
    private int totalIndexCacheMemory;
781
    private int indexCacheMemoryLimit;
781
    private int indexCacheMemoryLimit;
782
    private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
782
    private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
783

    
   
783

   
784
    @SuppressWarnings("unchecked")
784
    @SuppressWarnings("unchecked")
785
    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
785
    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
786
                           TaskReporter reporter
786
                           TaskReporter reporter
787
                           ) throws IOException, ClassNotFoundException {
787
                           ) throws IOException, ClassNotFoundException {
788
      this.job = job;
788
      this.job = job;
789
      this.reporter = reporter;
789
      this.reporter = reporter;
790
      partitions = job.getNumReduceTasks();
790
      partitions = job.getNumReduceTasks();
791
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
791
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
792

    
   
792

   
793
      //sanity checks
793
      //sanity checks
794
      final float spillper =
794
      final float spillper =
795
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
795
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
796
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
796
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
797
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
797
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
798
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
798
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
799
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
799
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
800
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
800
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
801
            "\": " + spillper);
801
            "\": " + spillper);
802
      }
802
      }
803
      if ((sortmb & 0x7FF) != sortmb) {
803
      if ((sortmb & 0x7FF) != sortmb) {
804
        throw new IOException(
804
        throw new IOException(
805
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
805
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
806
      }
806
      }
807
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
807
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
808
            QuickSort.class, IndexedSorter.class), job);
808
            QuickSort.class, IndexedSorter.class), job);
809
      // buffers and accounting
809
      // buffers and accounting
810
      int maxMemUsage = sortmb << 20;
810
      int maxMemUsage = sortmb << 20;
811
      maxMemUsage -= maxMemUsage % METASIZE;
811
      maxMemUsage -= maxMemUsage % METASIZE;
812
      kvbuffer = new byte[maxMemUsage];
812
      kvbuffer = new byte[maxMemUsage];
813
      bufvoid = kvbuffer.length;
813
      bufvoid = kvbuffer.length;
814
      kvmeta = ByteBuffer.wrap(kvbuffer).asIntBuffer();
814
      kvmeta = ByteBuffer.wrap(kvbuffer).asIntBuffer();
815
      setEquator(0);
815
      setEquator(0);
816
      bufstart = bufend = bufindex = equator;
816
      bufstart = bufend = bufindex = equator;
817
      kvstart = kvend = kvindex;
817
      kvstart = kvend = kvindex;
818

    
   
818

   
819
      maxRec = kvmeta.capacity() / NMETA;
819
      maxRec = kvmeta.capacity() / NMETA;
820
      softLimit = (int)(kvbuffer.length * spillper);
820
      softLimit = (int)(kvbuffer.length * spillper);
821
      bufferRemaining = softLimit;
821
      bufferRemaining = softLimit;
822
      if (LOG.isInfoEnabled()) {
822
      if (LOG.isInfoEnabled()) {
823
        LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
823
        LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
824
        LOG.info("soft limit at " + softLimit);
824
        LOG.info("soft limit at " + softLimit);
825
        LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
825
        LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
826
        LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
826
        LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
827
      }
827
      }
828

    
   
828

   
829
      // k/v serialization
829
      // k/v serialization
830
      comparator = job.getOutputKeyComparator();
830
      comparator = job.getOutputKeyComparator();
831
      keyClass = (Class<K>)job.getMapOutputKeyClass();
831
      keyClass = (Class<K>)job.getMapOutputKeyClass();
832
      valClass = (Class<V>)job.getMapOutputValueClass();
832
      valClass = (Class<V>)job.getMapOutputValueClass();
833
      serializationFactory = new SerializationFactory(job);
833
      serializationFactory = new SerializationFactory(job);
834
      keySerializer = serializationFactory.getSerializer(keyClass);
834
      keySerializer = serializationFactory.getSerializer(keyClass);

    
   
835
      if (this.keySerializer == null) {

    
   
836
        throw new IOException("Could not find a serializer for the Map-Output Key class: '" + keyClass.getCanonicalName() + "'.");

    
   
837
      }
835
      keySerializer.open(bb);
838
      keySerializer.open(bb);
836
      valSerializer = serializationFactory.getSerializer(valClass);
839
      valSerializer = serializationFactory.getSerializer(valClass);

    
   
840
      if (this.valSerializer == null) {

    
   
841
        throw new IOException("Could not find a serializer for the Map-Output Value class: '" + valClass.getCanonicalName() + "'.");

    
   
842
      }
837
      valSerializer.open(bb);
843
      valSerializer.open(bb);
838

    
   
844

   
839
      // output counters
845
      // output counters
840
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
846
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
841
      mapOutputRecordCounter =
847
      mapOutputRecordCounter =
842
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
848
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
843

    
   
849

   
844
      // compression
850
      // compression
845
      if (job.getCompressMapOutput()) {
851
      if (job.getCompressMapOutput()) {
846
        Class<? extends CompressionCodec> codecClass =
852
        Class<? extends CompressionCodec> codecClass =
847
          job.getMapOutputCompressorClass(DefaultCodec.class);
853
          job.getMapOutputCompressorClass(DefaultCodec.class);
848
        codec = ReflectionUtils.newInstance(codecClass, job);
854
        codec = ReflectionUtils.newInstance(codecClass, job);
849
      } else {
855
      } else {
850
        codec = null;
856
        codec = null;
851
      }
857
      }
852

    
   
858

   
853
      // combiner
859
      // combiner
854
      final Counters.Counter combineInputCounter =
860
      final Counters.Counter combineInputCounter =
855
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
861
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
856
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
862
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
857
                                             combineInputCounter,
863
                                             combineInputCounter,
858
                                             reporter, null);
864
                                             reporter, null);
859
      if (combinerRunner != null) {
865
      if (combinerRunner != null) {
860
        final Counters.Counter combineOutputCounter =
866
        final Counters.Counter combineOutputCounter =
861
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
867
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
862
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
868
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
863
      } else {
869
      } else {
864
        combineCollector = null;
870
        combineCollector = null;
865
      }
871
      }
866
      spillInProgress = false;
872
      spillInProgress = false;
867
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
873
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
868
      spillThread.setDaemon(true);
874
      spillThread.setDaemon(true);
869
      spillThread.setName("SpillThread");
875
      spillThread.setName("SpillThread");
870
      spillLock.lock();
876
      spillLock.lock();
871
      try {
877
      try {
872
        spillThread.start();
878
        spillThread.start();
873
        while (!spillThreadRunning) {
879
        while (!spillThreadRunning) {
874
          spillDone.await();
880
          spillDone.await();
875
        }
881
        }
876
      } catch (InterruptedException e) {
882
      } catch (InterruptedException e) {
877
        throw new IOException("Spill thread failed to initialize", e);
883
        throw new IOException("Spill thread failed to initialize", e);
878
      } finally {
884
      } finally {
879
        spillLock.unlock();
885
        spillLock.unlock();
880
      }
886
      }
881
      if (sortSpillException != null) {
887
      if (sortSpillException != null) {
882
        throw new IOException("Spill thread failed to initialize",
888
        throw new IOException("Spill thread failed to initialize",
883
            sortSpillException);
889
            sortSpillException);
884
      }
890
      }
885
    }
891
    }
886

    
   
892

   
887
    /**
893
    /**
888
     * Serialize the key, value to intermediate storage.
894
     * Serialize the key, value to intermediate storage.
889
     * When this method returns, kvindex must refer to sufficient unused
895
     * When this method returns, kvindex must refer to sufficient unused
890
     * storage to store one METADATA.
896
     * storage to store one METADATA.
891
     */
897
     */
892
    public synchronized void collect(K key, V value, final int partition
898
    public synchronized void collect(K key, V value, final int partition
893
                                     ) throws IOException {
899
                                     ) throws IOException {
894
      reporter.progress();
900
      reporter.progress();
895
      if (key.getClass() != keyClass) {
901
      if (key.getClass() != keyClass) {
896
        throw new IOException("Type mismatch in key from map: expected "
902
        throw new IOException("Type mismatch in key from map: expected "
897
                              + keyClass.getName() + ", received "
903
                              + keyClass.getName() + ", received "
898
                              + key.getClass().getName());
904
                              + key.getClass().getName());
899
      }
905
      }
900
      if (value.getClass() != valClass) {
906
      if (value.getClass() != valClass) {
901
        throw new IOException("Type mismatch in value from map: expected "
907
        throw new IOException("Type mismatch in value from map: expected "
902
                              + valClass.getName() + ", received "
908
                              + valClass.getName() + ", received "
903
                              + value.getClass().getName());
909
                              + value.getClass().getName());
904
      }
910
      }
905
      if (partition < 0 || partition >= partitions) {
911
      if (partition < 0 || partition >= partitions) {
906
        throw new IOException("Illegal partition for " + key + " (" +
912
        throw new IOException("Illegal partition for " + key + " (" +
907
            partition + ")");
913
            partition + ")");
908
      }
914
      }
909
      checkSpillException();
915
      checkSpillException();
910
      bufferRemaining -= METASIZE;
916
      bufferRemaining -= METASIZE;
911
      if (bufferRemaining <= 0) {
917
      if (bufferRemaining <= 0) {
912
        // start spill if the thread is not running and the soft limit has been
918
        // start spill if the thread is not running and the soft limit has been
913
        // reached
919
        // reached
914
        spillLock.lock();
920
        spillLock.lock();
915
        try {
921
        try {
916
          do {
922
          do {
917
            if (!spillInProgress) {
923
            if (!spillInProgress) {
918
              final int kvbidx = 4 * kvindex;
924
              final int kvbidx = 4 * kvindex;
919
              final int kvbend = 4 * kvend;
925
              final int kvbend = 4 * kvend;
920
              // serialized, unspilled bytes always lie between kvindex and
926
              // serialized, unspilled bytes always lie between kvindex and
921
              // bufindex, crossing the equator. Note that any void space
927
              // bufindex, crossing the equator. Note that any void space
922
              // created by a reset must be included in "used" bytes
928
              // created by a reset must be included in "used" bytes
923
              final int bUsed = distanceTo(kvbidx, bufindex);
929
              final int bUsed = distanceTo(kvbidx, bufindex);
924
              final boolean bufsoftlimit = bUsed >= softLimit;
930
              final boolean bufsoftlimit = bUsed >= softLimit;
925
              if ((kvbend + METASIZE) % kvbuffer.length !=
931
              if ((kvbend + METASIZE) % kvbuffer.length !=
926
                  equator - (equator % METASIZE)) {
932
                  equator - (equator % METASIZE)) {
927
                // spill finished, reclaim space
933
                // spill finished, reclaim space
928
                resetSpill();
934
                resetSpill();
929
                bufferRemaining = Math.min(
935
                bufferRemaining = Math.min(
930
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
936
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
931
                    softLimit - bUsed) - METASIZE;
937
                    softLimit - bUsed) - METASIZE;
932
                continue;
938
                continue;
933
              } else if (bufsoftlimit && kvindex != kvend) {
939
              } else if (bufsoftlimit && kvindex != kvend) {
934
                // spill records, if any collected; check latter, as it may
940
                // spill records, if any collected; check latter, as it may
935
                // be possible for metadata alignment to hit spill pcnt
941
                // be possible for metadata alignment to hit spill pcnt
936
                startSpill();
942
                startSpill();
937
                final int avgRec = (int)
943
                final int avgRec = (int)
938
                  (mapOutputByteCounter.getCounter() /
944
                  (mapOutputByteCounter.getCounter() /
939
                  mapOutputRecordCounter.getCounter());
945
                  mapOutputRecordCounter.getCounter());
940
                // leave at least half the split buffer for serialization data
946
                // leave at least half the split buffer for serialization data
941
                // ensure that kvindex >= bufindex
947
                // ensure that kvindex >= bufindex
942
                final int distkvi = distanceTo(bufindex, kvbidx);
948
                final int distkvi = distanceTo(bufindex, kvbidx);
943
                final int newPos = (bufindex +
949
                final int newPos = (bufindex +
944
                  Math.max(2 * METASIZE - 1,
950
                  Math.max(2 * METASIZE - 1,
945
                          Math.min(distkvi / 2,
951
                          Math.min(distkvi / 2,
946
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
952
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
947
                  % kvbuffer.length;
953
                  % kvbuffer.length;
948
                setEquator(newPos);
954
                setEquator(newPos);
949
                bufmark = bufindex = newPos;
955
                bufmark = bufindex = newPos;
950
                final int serBound = 4 * kvend;
956
                final int serBound = 4 * kvend;
951
                // bytes remaining before the lock must be held and limits
957
                // bytes remaining before the lock must be held and limits
952
                // checked is the minimum of three arcs: the metadata space, the
958
                // checked is the minimum of three arcs: the metadata space, the
953
                // serialization space, and the soft limit
959
                // serialization space, and the soft limit
954
                bufferRemaining = Math.min(
960
                bufferRemaining = Math.min(
955
                    // metadata max
961
                    // metadata max
956
                    distanceTo(bufend, newPos),
962
                    distanceTo(bufend, newPos),
957
                    Math.min(
963
                    Math.min(
958
                      // serialization max
964
                      // serialization max
959
                      distanceTo(newPos, serBound),
965
                      distanceTo(newPos, serBound),
960
                      // soft limit
966
                      // soft limit
961
                      softLimit)) - 2 * METASIZE;
967
                      softLimit)) - 2 * METASIZE;
962
              }
968
              }
963
            }
969
            }
964
          } while (false);
970
          } while (false);
965
        } finally {
971
        } finally {
966
          spillLock.unlock();
972
          spillLock.unlock();
967
        }
973
        }
968
      }
974
      }
969

    
   
975

   
970
      try {
976
      try {
971
        // serialize key bytes into buffer
977
        // serialize key bytes into buffer
972
        int keystart = bufindex;
978
        int keystart = bufindex;
973
        keySerializer.serialize(key);
979
        keySerializer.serialize(key);
974
        if (bufindex < keystart) {
980
        if (bufindex < keystart) {
975
          // wrapped the key; must make contiguous
981
          // wrapped the key; must make contiguous
976
          bb.shiftBufferedKey();
982
          bb.shiftBufferedKey();
977
          keystart = 0;
983
          keystart = 0;
978
        }
984
        }
979
        // serialize value bytes into buffer
985
        // serialize value bytes into buffer
980
        final int valstart = bufindex;
986
        final int valstart = bufindex;
981
        valSerializer.serialize(value);
987
        valSerializer.serialize(value);
982
        // It's possible for records to have zero length, i.e. the serializer
988
        // It's possible for records to have zero length, i.e. the serializer
983
        // will perform no writes. To ensure that the boundary conditions are
989
        // will perform no writes. To ensure that the boundary conditions are
984
        // checked and that the kvindex invariant is maintained, perform a
990
        // checked and that the kvindex invariant is maintained, perform a
985
        // zero-length write into the buffer. The logic monitoring this could be
991
        // zero-length write into the buffer. The logic monitoring this could be
986
        // moved into collect, but this is cleaner and inexpensive. For now, it
992
        // moved into collect, but this is cleaner and inexpensive. For now, it
987
        // is acceptable.
993
        // is acceptable.
988
        bb.write(b0, 0, 0);
994
        bb.write(b0, 0, 0);
989

    
   
995

   
990
        // the record must be marked after the preceding write, as the metadata
996
        // the record must be marked after the preceding write, as the metadata
991
        // for this record are not yet written
997
        // for this record are not yet written
992
        int valend = bb.markRecord();
998
        int valend = bb.markRecord();
993

    
   
999

   
994
        mapOutputRecordCounter.increment(1);
1000
        mapOutputRecordCounter.increment(1);
995
        mapOutputByteCounter.increment(
1001
        mapOutputByteCounter.increment(
996
            distanceTo(keystart, valend, bufvoid));
1002
            distanceTo(keystart, valend, bufvoid));
997

    
   
1003

   
998
        // write accounting info
1004
        // write accounting info
999
        kvmeta.put(kvindex + INDEX, kvindex);
1005
        kvmeta.put(kvindex + INDEX, kvindex);
1000
        kvmeta.put(kvindex + PARTITION, partition);
1006
        kvmeta.put(kvindex + PARTITION, partition);
1001
        kvmeta.put(kvindex + KEYSTART, keystart);
1007
        kvmeta.put(kvindex + KEYSTART, keystart);
1002
        kvmeta.put(kvindex + VALSTART, valstart);
1008
        kvmeta.put(kvindex + VALSTART, valstart);
1003
        // advance kvindex
1009
        // advance kvindex
1004
        kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
1010
        kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
1005
      } catch (MapBufferTooSmallException e) {
1011
      } catch (MapBufferTooSmallException e) {
1006
        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
1012
        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
1007
        spillSingleRecord(key, value, partition);
1013
        spillSingleRecord(key, value, partition);
1008
        mapOutputRecordCounter.increment(1);
1014
        mapOutputRecordCounter.increment(1);
1009
        return;
1015
        return;
1010
      }
1016
      }
1011
    }
1017
    }
1012

    
   
1018

   
1013
    /**
1019
    /**
1014
     * Set the point from which meta and serialization data expand. The meta
1020
     * Set the point from which meta and serialization data expand. The meta
1015
     * indices are aligned with the buffer, so metadata never spans the ends of
1021
     * indices are aligned with the buffer, so metadata never spans the ends of
1016
     * the circular buffer.
1022
     * the circular buffer.
1017
     */
1023
     */
1018
    private void setEquator(int pos) {
1024
    private void setEquator(int pos) {
1019
      equator = pos;
1025
      equator = pos;
1020
      // set index prior to first entry, aligned at meta boundary
1026
      // set index prior to first entry, aligned at meta boundary
1021
      final int aligned = pos - (pos % METASIZE);
1027
      final int aligned = pos - (pos % METASIZE);
1022
      kvindex =
1028
      kvindex =
1023
        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
1029
        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
1024
      if (LOG.isInfoEnabled()) {
1030
      if (LOG.isInfoEnabled()) {
1025
        LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
1031
        LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
1026
            "(" + (kvindex * 4) + ")");
1032
            "(" + (kvindex * 4) + ")");
1027
      }
1033
      }
1028
    }
1034
    }
1029

    
   
1035

   
1030
    /**
1036
    /**
1031
     * The spill is complete, so set the buffer and meta indices to be equal to
1037
     * The spill is complete, so set the buffer and meta indices to be equal to
1032
     * the new equator to free space for continuing collection. Note that when
1038
     * the new equator to free space for continuing collection. Note that when
1033
     * kvindex == kvend == kvstart, the buffer is empty.
1039
     * kvindex == kvend == kvstart, the buffer is empty.
1034
     */
1040
     */
1035
    private void resetSpill() {
1041
    private void resetSpill() {
1036
      final int e = equator;
1042
      final int e = equator;
1037
      bufstart = bufend = e;
1043
      bufstart = bufend = e;
1038
      final int aligned = e - (e % METASIZE);
1044
      final int aligned = e - (e % METASIZE);
1039
      // set start/end to point to first meta record
1045
      // set start/end to point to first meta record
1040
      kvstart = kvend =
1046
      kvstart = kvend =
1041
        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
1047
        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
1042
      if (LOG.isInfoEnabled()) {
1048
      if (LOG.isInfoEnabled()) {
1043
        LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
1049
        LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
1044
          (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
1050
          (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
1045
      }
1051
      }
1046
    }
1052
    }
1047

    
   
1053

   
1048
    /**
1054
    /**
1049
     * Compute the distance in bytes between two indices in the serialization
1055
     * Compute the distance in bytes between two indices in the serialization
1050
     * buffer.
1056
     * buffer.
1051
     * @see #distanceTo(int,int,int)
1057
     * @see #distanceTo(int,int,int)
1052
     */
1058
     */
1053
    final int distanceTo(final int i, final int j) {
1059
    final int distanceTo(final int i, final int j) {
1054
      return distanceTo(i, j, kvbuffer.length);
1060
      return distanceTo(i, j, kvbuffer.length);
1055
    }
1061
    }
1056

    
   
1062

   
1057
    /**
1063
    /**
1058
     * Compute the distance between two indices in the circular buffer given the
1064
     * Compute the distance between two indices in the circular buffer given the
1059
     * max distance.
1065
     * max distance.
1060
     */
1066
     */
1061
    int distanceTo(final int i, final int j, final int mod) {
1067
    int distanceTo(final int i, final int j, final int mod) {
1062
      return i <= j
1068
      return i <= j
1063
        ? j - i
1069
        ? j - i
1064
        : mod - i + j;
1070
        : mod - i + j;
1065
    }
1071
    }
1066

    
   
1072

   
1067
    /**
1073
    /**
1068
     * For the given meta position, return the dereferenced position in the
1074
     * For the given meta position, return the dereferenced position in the
1069
     * integer array. Each meta block contains several integers describing
1075
     * integer array. Each meta block contains several integers describing
1070
     * record data in its serialized form, but the INDEX is not necessarily
1076
     * record data in its serialized form, but the INDEX is not necessarily
1071
     * related to the proximate metadata. The index value at the referenced int
1077
     * related to the proximate metadata. The index value at the referenced int
1072
     * position is the start offset of the associated metadata block. So the
1078
     * position is the start offset of the associated metadata block. So the
1073
     * metadata INDEX at metapos may point to the metadata described by the
1079
     * metadata INDEX at metapos may point to the metadata described by the
1074
     * metadata block at metapos + k, which contains information about that
1080
     * metadata block at metapos + k, which contains information about that
1075
     * serialized record.
1081
     * serialized record.
1076
     */
1082
     */
1077
    int offsetFor(int metapos) {
1083
    int offsetFor(int metapos) {
1078
      return kvmeta.get(metapos * NMETA + INDEX);
1084
      return kvmeta.get(metapos * NMETA + INDEX);
1079
    }
1085
    }
1080

    
   
1086

   
1081
    /**
1087
    /**
1082
     * Compare logical range, st i, j MOD offset capacity.
1088
     * Compare logical range, st i, j MOD offset capacity.
1083
     * Compare by partition, then by key.
1089
     * Compare by partition, then by key.
1084
     * @see IndexedSortable#compare
1090
     * @see IndexedSortable#compare
1085
     */
1091
     */
1086
    public int compare(final int mi, final int mj) {
1092
    public int compare(final int mi, final int mj) {
1087
      final int kvi = offsetFor(mi % maxRec);
1093
      final int kvi = offsetFor(mi % maxRec);
1088
      final int kvj = offsetFor(mj % maxRec);
1094
      final int kvj = offsetFor(mj % maxRec);
1089
      final int kvip = kvmeta.get(kvi + PARTITION);
1095
      final int kvip = kvmeta.get(kvi + PARTITION);
1090
      final int kvjp = kvmeta.get(kvj + PARTITION);
1096
      final int kvjp = kvmeta.get(kvj + PARTITION);
1091
      // sort by partition
1097
      // sort by partition
1092
      if (kvip != kvjp) {
1098
      if (kvip != kvjp) {
1093
        return kvip - kvjp;
1099
        return kvip - kvjp;
1094
      }
1100
      }
1095
      // sort by key
1101
      // sort by key
1096
      return comparator.compare(kvbuffer,
1102
      return comparator.compare(kvbuffer,
1097
          kvmeta.get(kvi + KEYSTART),
1103
          kvmeta.get(kvi + KEYSTART),
1098
          kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
1104
          kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
1099
          kvbuffer,
1105
          kvbuffer,
1100
          kvmeta.get(kvj + KEYSTART),
1106
          kvmeta.get(kvj + KEYSTART),
1101
          kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
1107
          kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
1102
    }
1108
    }
1103

    
   
1109

   
1104
    /**
1110
    /**
1105
     * Swap logical indices st i, j MOD offset capacity.
1111
     * Swap logical indices st i, j MOD offset capacity.
1106
     * @see IndexedSortable#swap
1112
     * @see IndexedSortable#swap
1107
     */
1113
     */
1108
    public void swap(final int mi, final int mj) {
1114
    public void swap(final int mi, final int mj) {
1109
      final int kvi = (mi % maxRec) * NMETA + INDEX;
1115
      final int kvi = (mi % maxRec) * NMETA + INDEX;
1110
      final int kvj = (mj % maxRec) * NMETA + INDEX;
1116
      final int kvj = (mj % maxRec) * NMETA + INDEX;
1111
      int tmp = kvmeta.get(kvi);
1117
      int tmp = kvmeta.get(kvi);
1112
      kvmeta.put(kvi, kvmeta.get(kvj));
1118
      kvmeta.put(kvi, kvmeta.get(kvj));
1113
      kvmeta.put(kvj, tmp);
1119
      kvmeta.put(kvj, tmp);
1114
    }
1120
    }
1115

    
   
1121

   
1116
    /**
1122
    /**
1117
     * Inner class managing the spill of serialized records to disk.
1123
     * Inner class managing the spill of serialized records to disk.
1118
     */
1124
     */
1119
    protected class BlockingBuffer extends DataOutputStream {
1125
    protected class BlockingBuffer extends DataOutputStream {
1120

    
   
1126

   
1121
      public BlockingBuffer() {
1127
      public BlockingBuffer() {
1122
        super(new Buffer());
1128
        super(new Buffer());
1123
      }
1129
      }
1124

    
   
1130

   
1125
      /**
1131
      /**
1126
       * Mark end of record. Note that this is required if the buffer is to
1132
       * Mark end of record. Note that this is required if the buffer is to
1127
       * cut the spill in the proper place.
1133
       * cut the spill in the proper place.
1128
       */
1134
       */
1129
      public int markRecord() {
1135
      public int markRecord() {
1130
        bufmark = bufindex;
1136
        bufmark = bufindex;
1131
        return bufindex;
1137
        return bufindex;
1132
      }
1138
      }
1133

    
   
1139

   
1134
      /**
1140
      /**
1135
       * Set position from last mark to end of writable buffer, then rewrite
1141
       * Set position from last mark to end of writable buffer, then rewrite
1136
       * the data between last mark and kvindex.
1142
       * the data between last mark and kvindex.
1137
       * This handles a special case where the key wraps around the buffer.
1143
       * This handles a special case where the key wraps around the buffer.
1138
       * If the key is to be passed to a RawComparator, then it must be
1144
       * If the key is to be passed to a RawComparator, then it must be
1139
       * contiguous in the buffer. This recopies the data in the buffer back
1145
       * contiguous in the buffer. This recopies the data in the buffer back
1140
       * into itself, but starting at the beginning of the buffer. Note that
1146
       * into itself, but starting at the beginning of the buffer. Note that
1141
       * this method should <b>only</b> be called immediately after detecting
1147
       * this method should <b>only</b> be called immediately after detecting
1142
       * this condition. To call it at any other time is undefined and would
1148
       * this condition. To call it at any other time is undefined and would
1143
       * likely result in data loss or corruption.
1149
       * likely result in data loss or corruption.
1144
       * @see #markRecord()
1150
       * @see #markRecord()
1145
       */
1151
       */
1146
      protected void shiftBufferedKey() throws IOException {
1152
      protected void shiftBufferedKey() throws IOException {
1147
        // spillLock unnecessary; both kvend and kvindex are current
1153
        // spillLock unnecessary; both kvend and kvindex are current
1148
        int headbytelen = bufvoid - bufmark;
1154
        int headbytelen = bufvoid - bufmark;
1149
        bufvoid = bufmark;
1155
        bufvoid = bufmark;
1150
        final int kvbidx = 4 * kvindex;
1156
        final int kvbidx = 4 * kvindex;
1151
        final int kvbend = 4 * kvend;
1157
        final int kvbend = 4 * kvend;
1152
        final int avail =
1158
        final int avail =
1153
          Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
1159
          Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
1154
        if (bufindex + headbytelen < avail) {
1160
        if (bufindex + headbytelen < avail) {
1155
          System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
1161
          System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
1156
          System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
1162
          System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
1157
          bufindex += headbytelen;
1163
          bufindex += headbytelen;
1158
          bufferRemaining -= kvbuffer.length - bufvoid;
1164
          bufferRemaining -= kvbuffer.length - bufvoid;
1159
        } else {
1165
        } else {
1160
          byte[] keytmp = new byte[bufindex];
1166
          byte[] keytmp = new byte[bufindex];
1161
          System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
1167
          System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
1162
          bufindex = 0;
1168
          bufindex = 0;
1163
          out.write(kvbuffer, bufmark, headbytelen);
1169
          out.write(kvbuffer, bufmark, headbytelen);
1164
          out.write(keytmp);
1170
          out.write(keytmp);
1165
        }
1171
        }
1166
      }
1172
      }
1167
    }
1173
    }
1168

    
   
1174

   
1169
    public class Buffer extends OutputStream {
1175
    public class Buffer extends OutputStream {
1170
      private final byte[] scratch = new byte[1];
1176
      private final byte[] scratch = new byte[1];
1171

    
   
1177

   
1172
      @Override
1178
      @Override
1173
      public void write(int v)
1179
      public void write(int v)
1174
          throws IOException {
1180
          throws IOException {
1175
        scratch[0] = (byte)v;
1181
        scratch[0] = (byte)v;
1176
        write(scratch, 0, 1);
1182
        write(scratch, 0, 1);
1177
      }
1183
      }
1178

    
   
1184

   
1179
      /**
1185
      /**
1180
       * Attempt to write a sequence of bytes to the collection buffer.
1186
       * Attempt to write a sequence of bytes to the collection buffer.
1181
       * This method will block if the spill thread is running and it
1187
       * This method will block if the spill thread is running and it
1182
       * cannot write.
1188
       * cannot write.
1183
       * @throws MapBufferTooSmallException if record is too large to
1189
       * @throws MapBufferTooSmallException if record is too large to
1184
       *    deserialize into the collection buffer.
1190
       *    deserialize into the collection buffer.
1185
       */
1191
       */
1186
      @Override
1192
      @Override
1187
      public void write(byte b[], int off, int len)
1193
      public void write(byte b[], int off, int len)
1188
          throws IOException {
1194
          throws IOException {
1189
        // must always verify the invariant that at least METASIZE bytes are
1195
        // must always verify the invariant that at least METASIZE bytes are
1190
        // available beyond kvindex, even when len == 0
1196
        // available beyond kvindex, even when len == 0
1191
        bufferRemaining -= len;
1197
        bufferRemaining -= len;
1192
        if (bufferRemaining <= 0) {
1198
        if (bufferRemaining <= 0) {
1193
          // writing these bytes could exhaust available buffer space or fill
1199
          // writing these bytes could exhaust available buffer space or fill
1194
          // the buffer to soft limit. check if spill or blocking are necessary
1200
          // the buffer to soft limit. check if spill or blocking are necessary
1195
          boolean blockwrite = false;
1201
          boolean blockwrite = false;
1196
          spillLock.lock();
1202
          spillLock.lock();
1197
          try {
1203
          try {
1198
            do {
1204
            do {
1199
              checkSpillException();
1205
              checkSpillException();
1200

    
   
1206

   
1201
              final int kvbidx = 4 * kvindex;
1207
              final int kvbidx = 4 * kvindex;
1202
              final int kvbend = 4 * kvend;
1208
              final int kvbend = 4 * kvend;
1203
              // ser distance to key index
1209
              // ser distance to key index
1204
              final int distkvi = distanceTo(bufindex, kvbidx);
1210
              final int distkvi = distanceTo(bufindex, kvbidx);
1205
              // ser distance to spill end index
1211
              // ser distance to spill end index
1206
              final int distkve = distanceTo(bufindex, kvbend);
1212
              final int distkve = distanceTo(bufindex, kvbend);
1207

    
   
1213

   
1208
              // if kvindex is closer than kvend, then a spill is neither in
1214
              // if kvindex is closer than kvend, then a spill is neither in
1209
              // progress nor complete and reset since the lock was held. The
1215
              // progress nor complete and reset since the lock was held. The
1210
              // write should block only if there is insufficient space to
1216
              // write should block only if there is insufficient space to
1211
              // complete the current write, write the metadata for this record,
1217
              // complete the current write, write the metadata for this record,
1212
              // and write the metadata for the next record. If kvend is closer,
1218
              // and write the metadata for the next record. If kvend is closer,
1213
              // then the write should block if there is too little space for
1219
              // then the write should block if there is too little space for
1214
              // either the metadata or the current write. Note that collect
1220
              // either the metadata or the current write. Note that collect
1215
              // ensures its metadata requirement with a zero-length write
1221
              // ensures its metadata requirement with a zero-length write
1216
              blockwrite = distkvi <= distkve
1222
              blockwrite = distkvi <= distkve
1217
                ? distkvi <= len + 2 * METASIZE
1223
                ? distkvi <= len + 2 * METASIZE
1218
                : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
1224
                : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
1219

    
   
1225

   
1220
              if (!spillInProgress) {
1226
              if (!spillInProgress) {
1221
                if (blockwrite) {
1227
                if (blockwrite) {
1222
                  if ((kvbend + METASIZE) % kvbuffer.length !=
1228
                  if ((kvbend + METASIZE) % kvbuffer.length !=
1223
                      equator - (equator % METASIZE)) {
1229
                      equator - (equator % METASIZE)) {
1224
                    // spill finished, reclaim space
1230
                    // spill finished, reclaim space
1225
                    // need to use meta exclusively; zero-len rec & 100% spill
1231
                    // need to use meta exclusively; zero-len rec & 100% spill
1226
                    // pcnt would fail
1232
                    // pcnt would fail
1227
                    resetSpill(); // resetSpill doesn't move bufindex, kvindex
1233
                    resetSpill(); // resetSpill doesn't move bufindex, kvindex
1228
                    bufferRemaining = Math.min(
1234
                    bufferRemaining = Math.min(
1229
                        distkvi - 2 * METASIZE,
1235
                        distkvi - 2 * METASIZE,
1230
                        softLimit - distanceTo(kvbidx, bufindex)) - len;
1236
                        softLimit - distanceTo(kvbidx, bufindex)) - len;
1231
                    continue;
1237
                    continue;
1232
                  }
1238
                  }
1233
                  // we have records we can spill; only spill if blocked
1239
                  // we have records we can spill; only spill if blocked
1234
                  if (kvindex != kvend) {
1240
                  if (kvindex != kvend) {
1235
                    startSpill();
1241
                    startSpill();
1236
                    // Blocked on this write, waiting for the spill just
1242
                    // Blocked on this write, waiting for the spill just
1237
                    // initiated to finish. Instead of repositioning the marker
1243
                    // initiated to finish. Instead of repositioning the marker
1238
                    // and copying the partial record, we set the record start
1244
                    // and copying the partial record, we set the record start
1239
                    // to be the new equator
1245
                    // to be the new equator
1240
                    setEquator(bufmark);
1246
                    setEquator(bufmark);
1241
                  } else {
1247
                  } else {
1242
                    // We have no buffered records, and this record is too large
1248
                    // We have no buffered records, and this record is too large
1243
                    // to write into kvbuffer. We must spill it directly from
1249
                    // to write into kvbuffer. We must spill it directly from
1244
                    // collect
1250
                    // collect
1245
                    final int size = distanceTo(bufstart, bufindex) + len;
1251
                    final int size = distanceTo(bufstart, bufindex) + len;
1246
                    setEquator(0);
1252
                    setEquator(0);
1247
                    bufstart = bufend = bufindex = equator;
1253
                    bufstart = bufend = bufindex = equator;
1248
                    kvstart = kvend = kvindex;
1254
                    kvstart = kvend = kvindex;
1249
                    bufvoid = kvbuffer.length;
1255
                    bufvoid = kvbuffer.length;
1250
                    throw new MapBufferTooSmallException(size + " bytes");
1256
                    throw new MapBufferTooSmallException(size + " bytes");
1251
                  }
1257
                  }
1252
                }
1258
                }
1253
              }
1259
              }
1254

    
   
1260

   
1255
              if (blockwrite) {
1261
              if (blockwrite) {
1256
                // wait for spill
1262
                // wait for spill
1257
                try {
1263
                try {
1258
                  while (spillInProgress) {
1264
                  while (spillInProgress) {
1259
                    reporter.progress();
1265
                    reporter.progress();
1260
                    spillDone.await();
1266
                    spillDone.await();
1261
                  }
1267
                  }
1262
                } catch (InterruptedException e) {
1268
                } catch (InterruptedException e) {
1263
                    throw new IOException(
1269
                    throw new IOException(
1264
                        "Buffer interrupted while waiting for the writer", e);
1270
                        "Buffer interrupted while waiting for the writer", e);
1265
                }
1271
                }
1266
              }
1272
              }
1267
            } while (blockwrite);
1273
            } while (blockwrite);
1268
          } finally {
1274
          } finally {
1269
            spillLock.unlock();
1275
            spillLock.unlock();
1270
          }
1276
          }
1271
        }
1277
        }
1272
        // here, we know that we have sufficient space to write
1278
        // here, we know that we have sufficient space to write
1273
        if (bufindex + len > bufvoid) {
1279
        if (bufindex + len > bufvoid) {
1274
          final int gaplen = bufvoid - bufindex;
1280
          final int gaplen = bufvoid - bufindex;
1275
          System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
1281
          System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
1276
          len -= gaplen;
1282
          len -= gaplen;
1277
          off += gaplen;
1283
          off += gaplen;
1278
          bufindex = 0;
1284
          bufindex = 0;
1279
        }
1285
        }
1280
        System.arraycopy(b, off, kvbuffer, bufindex, len);
1286
        System.arraycopy(b, off, kvbuffer, bufindex, len);
1281
        bufindex += len;
1287
        bufindex += len;
1282
      }
1288
      }
1283
    }
1289
    }
1284

    
   
1290

   
1285
    public void flush() throws IOException, ClassNotFoundException,
1291
    public void flush() throws IOException, ClassNotFoundException,
1286
           InterruptedException {
1292
           InterruptedException {
1287
      LOG.info("Starting flush of map output");
1293
      LOG.info("Starting flush of map output");
1288
      spillLock.lock();
1294
      spillLock.lock();
1289
      try {
1295
      try {
1290
        while (spillInProgress) {
1296
        while (spillInProgress) {
1291
          reporter.progress();
1297
          reporter.progress();
1292
          spillDone.await();
1298
          spillDone.await();
1293
        }
1299
        }
1294
        checkSpillException();
1300
        checkSpillException();
1295

    
   
1301

   
1296
        final int kvbend = 4 * kvend;
1302
        final int kvbend = 4 * kvend;
1297
        if ((kvbend + METASIZE) % kvbuffer.length !=
1303
        if ((kvbend + METASIZE) % kvbuffer.length !=
1298
            equator - (equator % METASIZE)) {
1304
            equator - (equator % METASIZE)) {
1299
          // spill finished
1305
          // spill finished
1300
          resetSpill();
1306
          resetSpill();
1301
        }
1307
        }
1302
        if (kvindex != kvend) {
1308
        if (kvindex != kvend) {
1303
          kvend = (kvindex + NMETA) % kvmeta.capacity();
1309
          kvend = (kvindex + NMETA) % kvmeta.capacity();
1304
          bufend = bufmark;
1310
          bufend = bufmark;
1305
          if (LOG.isInfoEnabled()) {
1311
          if (LOG.isInfoEnabled()) {
1306
            LOG.info("Spilling map output");
1312
            LOG.info("Spilling map output");
1307
            LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
1313
            LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
1308
                     "; bufvoid = " + bufvoid);
1314
                     "; bufvoid = " + bufvoid);
1309
            LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
1315
            LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
1310
                     "); kvend = " + kvend + "(" + (kvend * 4) +
1316
                     "); kvend = " + kvend + "(" + (kvend * 4) +
1311
                     "); length = " + (distanceTo(kvend, kvstart,
1317
                     "); length = " + (distanceTo(kvend, kvstart,
1312
                           kvmeta.capacity()) + 1) + "/" + maxRec);
1318
                           kvmeta.capacity()) + 1) + "/" + maxRec);
1313
          }
1319
          }
1314
          sortAndSpill();
1320
          sortAndSpill();
1315
        }
1321
        }
1316
      } catch (InterruptedException e) {
1322
      } catch (InterruptedException e) {
1317
        throw new IOException("Interrupted while waiting for the writer", e);
1323
        throw new IOException("Interrupted while waiting for the writer", e);
1318
      } finally {
1324
      } finally {
1319
        spillLock.unlock();
1325
        spillLock.unlock();
1320
      }
1326
      }
1321
      assert !spillLock.isHeldByCurrentThread();
1327
      assert !spillLock.isHeldByCurrentThread();
1322
      // shut down spill thread and wait for it to exit. Since the preceding
1328
      // shut down spill thread and wait for it to exit. Since the preceding
1323
      // ensures that it is finished with its work (and sortAndSpill did not
1329
      // ensures that it is finished with its work (and sortAndSpill did not
1324
      // throw), we elect to use an interrupt instead of setting a flag.
1330
      // throw), we elect to use an interrupt instead of setting a flag.
1325
      // Spilling simultaneously from this thread while the spill thread
1331
      // Spilling simultaneously from this thread while the spill thread
1326
      // finishes its work might be both a useful way to extend this and also
1332
      // finishes its work might be both a useful way to extend this and also
1327
      // sufficient motivation for the latter approach.
1333
      // sufficient motivation for the latter approach.
1328
      try {
1334
      try {
1329
        spillThread.interrupt();
1335
        spillThread.interrupt();
1330
        spillThread.join();
1336
        spillThread.join();
1331
      } catch (InterruptedException e) {
1337
      } catch (InterruptedException e) {
1332
        throw new IOException("Spill failed", e);
1338
        throw new IOException("Spill failed", e);
1333
      }
1339
      }
1334
      // release sort buffer before the merge
1340
      // release sort buffer before the merge
1335
      kvbuffer = null;
1341
      kvbuffer = null;
1336
      mergeParts();
1342
      mergeParts();
1337
    }
1343
    }
1338

    
   
1344

   
1339
    public void close() { }
1345
    public void close() { }
1340

    
   
1346

   
1341
    protected class SpillThread extends Thread {
1347
    protected class SpillThread extends Thread {
1342

    
   
1348

   
1343
      @Override
1349
      @Override
1344
      public void run() {
1350
      public void run() {
1345
        spillLock.lock();
1351
        spillLock.lock();
1346
        spillThreadRunning = true;
1352
        spillThreadRunning = true;
1347
        try {
1353
        try {
1348
          while (true) {
1354
          while (true) {
1349
            spillDone.signal();
1355
            spillDone.signal();
1350
            while (!spillInProgress) {
1356
            while (!spillInProgress) {
1351
              spillReady.await();
1357
              spillReady.await();
1352
            }
1358
            }
1353
            try {
1359
            try {
1354
              spillLock.unlock();
1360
              spillLock.unlock();
1355
              sortAndSpill();
1361
              sortAndSpill();
1356
            } catch (Throwable t) {
1362
            } catch (Throwable t) {
1357
              sortSpillException = t;
1363
              sortSpillException = t;
1358
            } finally {
1364
            } finally {
1359
              spillLock.lock();
1365
              spillLock.lock();
1360
              if (bufend < bufstart) {
1366
              if (bufend < bufstart) {
1361
                bufvoid = kvbuffer.length;
1367
                bufvoid = kvbuffer.length;
1362
              }
1368
              }
1363
              kvstart = kvend;
1369
              kvstart = kvend;
1364
              bufstart = bufend;
1370
              bufstart = bufend;
1365
              spillInProgress = false;
1371
              spillInProgress = false;
1366
            }
1372
            }
1367
          }
1373
          }
1368
        } catch (InterruptedException e) {
1374
        } catch (InterruptedException e) {
1369
          Thread.currentThread().interrupt();
1375
          Thread.currentThread().interrupt();
1370
        } finally {
1376
        } finally {
1371
          spillLock.unlock();
1377
          spillLock.unlock();
1372
          spillThreadRunning = false;
1378
          spillThreadRunning = false;
1373
        }
1379
        }
1374
      }
1380
      }
1375
    }
1381
    }
1376

    
   
1382

   
1377
    private void checkSpillException() throws IOException {
1383
    private void checkSpillException() throws IOException {
1378
      final Throwable lspillException = sortSpillException;
1384
      final Throwable lspillException = sortSpillException;
1379
      if (lspillException != null) {
1385
      if (lspillException != null) {
1380
        if (lspillException instanceof Error) {
1386
        if (lspillException instanceof Error) {
1381
          final String logMsg = "Task " + getTaskID() + " failed : " +
1387
          final String logMsg = "Task " + getTaskID() + " failed : " +
1382
            StringUtils.stringifyException(lspillException);
1388
            StringUtils.stringifyException(lspillException);
1383
          reportFatalError(getTaskID(), lspillException, logMsg);
1389
          reportFatalError(getTaskID(), lspillException, logMsg);
1384
        }
1390
        }
1385
        throw new IOException("Spill failed", lspillException);
1391
        throw new IOException("Spill failed", lspillException);
1386
      }
1392
      }
1387
    }
1393
    }
1388

    
   
1394

   
1389
    private void startSpill() {
1395
    private void startSpill() {
1390
      assert !spillInProgress;
1396
      assert !spillInProgress;
1391
      kvend = (kvindex + NMETA) % kvmeta.capacity();
1397
      kvend = (kvindex + NMETA) % kvmeta.capacity();
1392
      bufend = bufmark;
1398
      bufend = bufmark;
1393
      spillInProgress = true;
1399
      spillInProgress = true;
1394
      if (LOG.isInfoEnabled()) {
1400
      if (LOG.isInfoEnabled()) {
1395
        LOG.info("Spilling map output");
1401
        LOG.info("Spilling map output");
1396
        LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
1402
        LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
1397
                 "; bufvoid = " + bufvoid);
1403
                 "; bufvoid = " + bufvoid);
1398
        LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
1404
        LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
1399
                 "); kvend = " + kvend + "(" + (kvend * 4) +
1405
                 "); kvend = " + kvend + "(" + (kvend * 4) +
1400
                 "); length = " + (distanceTo(kvend, kvstart,
1406
                 "); length = " + (distanceTo(kvend, kvstart,
1401
                       kvmeta.capacity()) + 1) + "/" + maxRec);
1407
                       kvmeta.capacity()) + 1) + "/" + maxRec);
1402
      }
1408
      }
1403
      spillReady.signal();
1409
      spillReady.signal();
1404
    }
1410
    }
1405

    
   
1411

   
1406
    private void sortAndSpill() throws IOException, ClassNotFoundException,
1412
    private void sortAndSpill() throws IOException, ClassNotFoundException,
1407
                                       InterruptedException {
1413
                                       InterruptedException {
1408
      //approximate the length of the output file to be the length of the
1414
      //approximate the length of the output file to be the length of the
1409
      //buffer + header lengths for the partitions
1415
      //buffer + header lengths for the partitions
1410
      final long size = (bufend >= bufstart
1416
      final long size = (bufend >= bufstart
1411
          ? bufend - bufstart
1417
          ? bufend - bufstart
1412
          : (bufvoid - bufend) + bufstart) +
1418
          : (bufvoid - bufend) + bufstart) +
1413
                  partitions * APPROX_HEADER_LENGTH;
1419
                  partitions * APPROX_HEADER_LENGTH;
1414
      FSDataOutputStream out = null;
1420
      FSDataOutputStream out = null;
1415
      try {
1421
      try {
1416
        // create spill file
1422
        // create spill file
1417
        final SpillRecord spillRec = new SpillRecord(partitions);
1423
        final SpillRecord spillRec = new SpillRecord(partitions);
1418
        final Path filename =
1424
        final Path filename =
1419
            mapOutputFile.getSpillFileForWrite(numSpills, size);
1425
            mapOutputFile.getSpillFileForWrite(numSpills, size);
1420
        out = rfs.create(filename);
1426
        out = rfs.create(filename);
1421

    
   
1427

   
1422
        final int mstart = kvend / NMETA;
1428
        final int mstart = kvend / NMETA;
1423
        final int mend = 1 + // kvend is a valid record
1429
        final int mend = 1 + // kvend is a valid record
1424
          (kvstart >= kvend
1430
          (kvstart >= kvend
1425
          ? kvstart
1431
          ? kvstart
1426
          : kvmeta.capacity() + kvstart) / NMETA;
1432
          : kvmeta.capacity() + kvstart) / NMETA;
1427
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
1433
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
1428
        int spindex = mstart;
1434
        int spindex = mstart;
1429
        final IndexRecord rec = new IndexRecord();
1435
        final IndexRecord rec = new IndexRecord();
1430
        final InMemValBytes value = new InMemValBytes();
1436
        final InMemValBytes value = new InMemValBytes();
1431
        for (int i = 0; i < partitions; ++i) {
1437
        for (int i = 0; i < partitions; ++i) {
1432
          IFile.Writer<K, V> writer = null;
1438
          IFile.Writer<K, V> writer = null;
1433
          try {
1439
          try {
1434
            long segmentStart = out.getPos();
1440
            long segmentStart = out.getPos();
1435
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
1441
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
1436
                                      spilledRecordsCounter);
1442
                                      spilledRecordsCounter);
1437
            if (combinerRunner == null) {
1443
            if (combinerRunner == null) {
1438
              // spill directly
1444
              // spill directly
1439
              DataInputBuffer key = new DataInputBuffer();
1445
              DataInputBuffer key = new DataInputBuffer();
1440
              while (spindex < mend &&
1446
              while (spindex < mend &&
1441
                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
1447
                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
1442
                final int kvoff = offsetFor(spindex % maxRec);
1448
                final int kvoff = offsetFor(spindex % maxRec);
1443
                key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
1449
                key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
1444
                          (kvmeta.get(kvoff + VALSTART) -
1450
                          (kvmeta.get(kvoff + VALSTART) -
1445
                           kvmeta.get(kvoff + KEYSTART)));
1451
                           kvmeta.get(kvoff + KEYSTART)));
1446
                getVBytesForOffset(kvoff, value);
1452
                getVBytesForOffset(kvoff, value);
1447
                writer.append(key, value);
1453
                writer.append(key, value);
1448
                ++spindex;
1454
                ++spindex;
1449
              }
1455
              }
1450
            } else {
1456
            } else {
1451
              int spstart = spindex;
1457
              int spstart = spindex;
1452
              while (spindex < mend &&
1458
              while (spindex < mend &&
1453
                  kvmeta.get(offsetFor(spindex % maxRec)
1459
                  kvmeta.get(offsetFor(spindex % maxRec)
1454
                            + PARTITION) == i) {
1460
                            + PARTITION) == i) {
1455
                ++spindex;
1461
                ++spindex;
1456
              }
1462
              }
1457
              // Note: we would like to avoid the combiner if we've fewer
1463
              // Note: we would like to avoid the combiner if we've fewer
1458
              // than some threshold of records for a partition
1464
              // than some threshold of records for a partition
1459
              if (spstart != spindex) {
1465
              if (spstart != spindex) {
1460
                combineCollector.setWriter(writer);
1466
                combineCollector.setWriter(writer);
1461
                RawKeyValueIterator kvIter =
1467
                RawKeyValueIterator kvIter =
1462
                  new MRResultIterator(spstart, spindex);
1468
                  new MRResultIterator(spstart, spindex);
1463
                combinerRunner.combine(kvIter, combineCollector);
1469
                combinerRunner.combine(kvIter, combineCollector);
1464
              }
1470
              }
1465
            }
1471
            }
1466

    
   
1472

   
1467
            // close the writer
1473
            // close the writer
1468
            writer.close();
1474
            writer.close();
1469

    
   
1475

   
1470
            // record offsets
1476
            // record offsets
1471
            rec.startOffset = segmentStart;
1477
            rec.startOffset = segmentStart;
1472
            rec.rawLength = writer.getRawLength();
1478
            rec.rawLength = writer.getRawLength();
1473
            rec.partLength = writer.getCompressedLength();
1479
            rec.partLength = writer.getCompressedLength();
1474
            spillRec.putIndex(rec, i);
1480
            spillRec.putIndex(rec, i);
1475

    
   
1481

   
1476
            writer = null;
1482
            writer = null;
1477
          } finally {
1483
          } finally {
1478
            if (null != writer) writer.close();
1484
            if (null != writer) writer.close();
1479
          }
1485
          }
1480
        }
1486
        }
1481

    
   
1487

   
1482
        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
1488
        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
1483
          // create spill index file
1489
          // create spill index file
1484
          Path indexFilename =
1490
          Path indexFilename =
1485
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
1491
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
1486
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
1492
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
1487
          spillRec.writeToFile(indexFilename, job);
1493
          spillRec.writeToFile(indexFilename, job);
1488
        } else {
1494
        } else {
1489
          indexCacheList.add(spillRec);
1495
          indexCacheList.add(spillRec);
1490
          totalIndexCacheMemory +=
1496
          totalIndexCacheMemory +=
1491
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
1497
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
1492
        }
1498
        }
1493
        LOG.info("Finished spill " + numSpills);
1499
        LOG.info("Finished spill " + numSpills);
1494
        ++numSpills;
1500
        ++numSpills;
1495
      } finally {
1501
      } finally {
1496
        if (out != null) out.close();
1502
        if (out != null) out.close();
1497
      }
1503
      }
1498
    }
1504
    }
1499

    
   
1505

   
1500
    /**
1506
    /**
1501
     * Handles the degenerate case where serialization fails to fit in
1507
     * Handles the degenerate case where serialization fails to fit in
1502
     * the in-memory buffer, so we must spill the record from collect
1508
     * the in-memory buffer, so we must spill the record from collect
1503
     * directly to a spill file. Consider this "losing".
1509
     * directly to a spill file. Consider this "losing".
1504
     */
1510
     */
1505
    private void spillSingleRecord(final K key, final V value,
1511
    private void spillSingleRecord(final K key, final V value,
1506
                                   int partition) throws IOException {
1512
                                   int partition) throws IOException {
1507
      long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
1513
      long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
1508
      FSDataOutputStream out = null;
1514
      FSDataOutputStream out = null;
1509
      try {
1515
      try {
1510
        // create spill file
1516
        // create spill file
1511
        final SpillRecord spillRec = new SpillRecord(partitions);
1517
        final SpillRecord spillRec = new SpillRecord(partitions);
1512
        final Path filename =
1518
        final Path filename =
1513
            mapOutputFile.getSpillFileForWrite(numSpills, size);
1519
            mapOutputFile.getSpillFileForWrite(numSpills, size);
1514
        out = rfs.create(filename);
1520
        out = rfs.create(filename);
1515

    
   
1521

   
1516
        // we don't run the combiner for a single record
1522
        // we don't run the combiner for a single record
1517
        IndexRecord rec = new IndexRecord();
1523
        IndexRecord rec = new IndexRecord();
1518
        for (int i = 0; i < partitions; ++i) {
1524
        for (int i = 0; i < partitions; ++i) {
1519
          IFile.Writer<K, V> writer = null;
1525
          IFile.Writer<K, V> writer = null;
1520
          try {
1526
          try {
1521
            long segmentStart = out.getPos();
1527
            long segmentStart = out.getPos();
1522
            // Create a new codec, don't care!
1528
            // Create a new codec, don't care!
1523
            writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
1529
            writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
1524
                                            spilledRecordsCounter);
1530
                                            spilledRecordsCounter);
1525

    
   
1531

   
1526
            if (i == partition) {
1532
            if (i == partition) {
1527
              final long recordStart = out.getPos();
1533
              final long recordStart = out.getPos();
1528
              writer.append(key, value);
1534
              writer.append(key, value);
1529
              // Note that our map byte count will not be accurate with
1535
              // Note that our map byte count will not be accurate with
1530
              // compression
1536
              // compression
1531
              mapOutputByteCounter.increment(out.getPos() - recordStart);
1537
              mapOutputByteCounter.increment(out.getPos() - recordStart);
1532
            }
1538
            }
1533
            writer.close();
1539
            writer.close();
1534

    
   
1540

   
1535
            // record offsets
1541
            // record offsets
1536
            rec.startOffset = segmentStart;
1542
            rec.startOffset = segmentStart;
1537
            rec.rawLength = writer.getRawLength();
1543
            rec.rawLength = writer.getRawLength();
1538
            rec.partLength = writer.getCompressedLength();
1544
            rec.partLength = writer.getCompressedLength();
1539
            spillRec.putIndex(rec, i);
1545
            spillRec.putIndex(rec, i);
1540

    
   
1546

   
1541
            writer = null;
1547
            writer = null;
1542
          } catch (IOException e) {
1548
          } catch (IOException e) {
1543
            if (null != writer) writer.close();
1549
            if (null != writer) writer.close();
1544
            throw e;
1550
            throw e;
1545
          }
1551
          }
1546
        }
1552
        }
1547
        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
1553
        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
1548
          // create spill index file
1554
          // create spill index file
1549
          Path indexFilename =
1555
          Path indexFilename =
1550
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
1556
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
1551
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
1557
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
1552
          spillRec.writeToFile(indexFilename, job);
1558
          spillRec.writeToFile(indexFilename, job);
1553
        } else {
1559
        } else {
1554
          indexCacheList.add(spillRec);
1560
          indexCacheList.add(spillRec);
1555
          totalIndexCacheMemory +=
1561
          totalIndexCacheMemory +=
1556
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
1562
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
1557
        }
1563
        }
1558
        ++numSpills;
1564
        ++numSpills;
1559
      } finally {
1565
      } finally {
1560
        if (out != null) out.close();
1566
        if (out != null) out.close();
1561
      }
1567
      }
1562
    }
1568
    }
1563

    
   
1569

   
1564
    /**
1570
    /**
1565
     * Given an offset, populate vbytes with the associated set of
1571
     * Given an offset, populate vbytes with the associated set of
1566
     * deserialized value bytes. Should only be called during a spill.
1572
     * deserialized value bytes. Should only be called during a spill.
1567
     */
1573
     */
1568
    private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
1574
    private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
1569
      // get the keystart for the next serialized value to be the end
1575
      // get the keystart for the next serialized value to be the end
1570
      // of this value. If this is the last value in the buffer, use bufend
1576
      // of this value. If this is the last value in the buffer, use bufend
1571
      final int nextindex = kvoff == kvend
1577
      final int nextindex = kvoff == kvend
1572
        ? bufend
1578
        ? bufend
1573
        : kvmeta.get(
1579
        : kvmeta.get(
1574
            (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
1580
            (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
1575
      // calculate the length of the value
1581
      // calculate the length of the value
1576
      int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
1582
      int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
1577
        ? nextindex - kvmeta.get(kvoff + VALSTART)
1583
        ? nextindex - kvmeta.get(kvoff + VALSTART)
1578
        : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
1584
        : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
1579
      vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
1585
      vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
1580
    }
1586
    }
1581

    
   
1587

   
1582
    /**
1588
    /**
1583
     * Inner class wrapping valuebytes, used for appendRaw.
1589
     * Inner class wrapping valuebytes, used for appendRaw.
1584
     */
1590
     */
1585
    protected class InMemValBytes extends DataInputBuffer {
1591
    protected class InMemValBytes extends DataInputBuffer {
1586
      private byte[] buffer;
1592
      private byte[] buffer;
1587
      private int start;
1593
      private int start;
1588
      private int length;
1594
      private int length;
1589

    
   
1595

   
1590
      public void reset(byte[] buffer, int start, int length) {
1596
      public void reset(byte[] buffer, int start, int length) {
1591
        this.buffer = buffer;
1597
        this.buffer = buffer;
1592
        this.start = start;
1598
        this.start = start;
1593
        this.length = length;
1599
        this.length = length;
1594

    
   
1600

   
1595
        if (start + length > bufvoid) {
1601
        if (start + length > bufvoid) {
1596
          this.buffer = new byte[this.length];
1602
          this.buffer = new byte[this.length];
1597
          final int taillen = bufvoid - start;
1603
          final int taillen = bufvoid - start;
1598
          System.arraycopy(buffer, start, this.buffer, 0, taillen);
1604
          System.arraycopy(buffer, start, this.buffer, 0, taillen);
1599
          System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
1605
          System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
1600
          this.start = 0;
1606
          this.start = 0;
1601
        }
1607
        }
1602

    
   
1608

   
1603
        super.reset(this.buffer, this.start, this.length);
1609
        super.reset(this.buffer, this.start, this.length);
1604
      }
1610
      }
1605
    }
1611
    }
1606

    
   
1612

   
1607
    protected class MRResultIterator implements RawKeyValueIterator {
1613
    protected class MRResultIterator implements RawKeyValueIterator {
1608
      private final DataInputBuffer keybuf = new DataInputBuffer();
1614
      private final DataInputBuffer keybuf = new DataInputBuffer();
1609
      private final InMemValBytes vbytes = new InMemValBytes();
1615
      private final InMemValBytes vbytes = new InMemValBytes();
1610
      private final int end;
1616
      private final int end;
1611
      private int current;
1617
      private int current;
1612
      public MRResultIterator(int start, int end) {
1618
      public MRResultIterator(int start, int end) {
1613
        this.end = end;
1619
        this.end = end;
1614
        current = start - 1;
1620
        current = start - 1;
1615
      }
1621
      }
1616
      public boolean next() throws IOException {
1622
      public boolean next() throws IOException {
1617
        return ++current < end;
1623
        return ++current < end;
1618
      }
1624
      }
1619
      public DataInputBuffer getKey() throws IOException {
1625
      public DataInputBuffer getKey() throws IOException {
1620
        final int kvoff = offsetFor(current % maxRec);
1626
        final int kvoff = offsetFor(current % maxRec);
1621
        keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
1627
        keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
1622
            kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
1628
            kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
1623
        return keybuf;
1629
        return keybuf;
1624
      }
1630
      }
1625
      public DataInputBuffer getValue() throws IOException {
1631
      public DataInputBuffer getValue() throws IOException {
1626
        getVBytesForOffset(offsetFor(current % maxRec), vbytes);
1632
        getVBytesForOffset(offsetFor(current % maxRec), vbytes);
1627
        return vbytes;
1633
        return vbytes;
1628
      }
1634
      }
1629
      public Progress getProgress() {
1635
      public Progress getProgress() {
1630
        return null;
1636
        return null;
1631
      }
1637
      }
1632
      public void close() { }
1638
      public void close() { }
1633
    }
1639
    }
1634

    
   
1640

   
1635
    private void mergeParts() throws IOException, InterruptedException, 
1641
    private void mergeParts() throws IOException, InterruptedException, 
1636
                                     ClassNotFoundException {
1642
                                     ClassNotFoundException {
1637
      // get the approximate size of the final output/index files
1643
      // get the approximate size of the final output/index files
1638
      long finalOutFileSize = 0;
1644
      long finalOutFileSize = 0;
1639
      long finalIndexFileSize = 0;
1645
      long finalIndexFileSize = 0;
1640
      final Path[] filename = new Path[numSpills];
1646
      final Path[] filename = new Path[numSpills];
1641
      final TaskAttemptID mapId = getTaskID();
1647
      final TaskAttemptID mapId = getTaskID();
1642

    
   
1648

   
1643
      for(int i = 0; i < numSpills; i++) {
1649
      for(int i = 0; i < numSpills; i++) {
1644
        filename[i] = mapOutputFile.getSpillFile(i);
1650
        filename[i] = mapOutputFile.getSpillFile(i);
1645
        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
1651
        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
1646
      }
1652
      }
1647
      if (numSpills == 1) { //the spill is the final output
1653
      if (numSpills == 1) { //the spill is the final output
1648
        rfs.rename(filename[0],
1654
        rfs.rename(filename[0],
1649
            new Path(filename[0].getParent(), "file.out"));
1655
            new Path(filename[0].getParent(), "file.out"));
1650
        if (indexCacheList.size() == 0) {
1656
        if (indexCacheList.size() == 0) {
1651
          rfs.rename(mapOutputFile.getSpillIndexFile(0),
1657
          rfs.rename(mapOutputFile.getSpillIndexFile(0),
1652
              new Path(filename[0].getParent(),"file.out.index"));
1658
              new Path(filename[0].getParent(),"file.out.index"));
1653
        } else {
1659
        } else {
1654
          indexCacheList.get(0).writeToFile(
1660
          indexCacheList.get(0).writeToFile(
1655
                new Path(filename[0].getParent(),"file.out.index"), job);
1661
                new Path(filename[0].getParent(),"file.out.index"), job);
1656
        }
1662
        }
1657
        return;
1663
        return;
1658
      }
1664
      }
1659

    
   
1665

   
1660
      // read in paged indices
1666
      // read in paged indices
1661
      for (int i = indexCacheList.size(); i < numSpills; ++i) {
1667
      for (int i = indexCacheList.size(); i < numSpills; ++i) {
1662
        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
1668
        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
1663
        indexCacheList.add(new SpillRecord(indexFileName, job));
1669
        indexCacheList.add(new SpillRecord(indexFileName, job));
1664
      }
1670
      }
1665

    
   
1671

   
1666
      //make correction in the length to include the sequence file header
1672
      //make correction in the length to include the sequence file header
1667
      //lengths for each partition
1673
      //lengths for each partition
1668
      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
1674
      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
1669
      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
1675
      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
1670
      Path finalOutputFile =
1676
      Path finalOutputFile =
1671
          mapOutputFile.getOutputFileForWrite(finalOutFileSize);
1677
          mapOutputFile.getOutputFileForWrite(finalOutFileSize);
1672
      Path finalIndexFile =
1678
      Path finalIndexFile =
1673
          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
1679
          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
1674

    
   
1680

   
1675
      //The output stream for the final single output file
1681
      //The output stream for the final single output file
1676
      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
1682
      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
1677

    
   
1683

   
1678
      if (numSpills == 0) {
1684
      if (numSpills == 0) {
1679
        //create dummy files
1685
        //create dummy files
1680
        IndexRecord rec = new IndexRecord();
1686
        IndexRecord rec = new IndexRecord();
1681
        SpillRecord sr = new SpillRecord(partitions);
1687
        SpillRecord sr = new SpillRecord(partitions);
1682
        try {
1688
        try {
1683
          for (int i = 0; i < partitions; i++) {
1689
          for (int i = 0; i < partitions; i++) {
1684
            long segmentStart = finalOut.getPos();
1690
            long segmentStart = finalOut.getPos();
1685
            Writer<K, V> writer =
1691
            Writer<K, V> writer =
1686
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
1692
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
1687
            writer.close();
1693
            writer.close();
1688
            rec.startOffset = segmentStart;
1694
            rec.startOffset = segmentStart;
1689
            rec.rawLength = writer.getRawLength();
1695
            rec.rawLength = writer.getRawLength();
1690
            rec.partLength = writer.getCompressedLength();
1696
            rec.partLength = writer.getCompressedLength();
1691
            sr.putIndex(rec, i);
1697
            sr.putIndex(rec, i);
1692
          }
1698
          }
1693
          sr.writeToFile(finalIndexFile, job);
1699
          sr.writeToFile(finalIndexFile, job);
1694
        } finally {
1700
        } finally {
1695
          finalOut.close();
1701
          finalOut.close();
1696
        }
1702
        }
1697
        return;
1703
        return;
1698
      }
1704
      }
1699
      {
1705
      {
1700
        sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
1706
        sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
1701
        Merger.considerFinalMergeForProgress();
1707
        Merger.considerFinalMergeForProgress();
1702
        
1708
        
1703
        IndexRecord rec = new IndexRecord();
1709
        IndexRecord rec = new IndexRecord();
1704
        final SpillRecord spillRec = new SpillRecord(partitions);
1710
        final SpillRecord spillRec = new SpillRecord(partitions);
1705
        for (int parts = 0; parts < partitions; parts++) {
1711
        for (int parts = 0; parts < partitions; parts++) {
1706
          //create the segments to be merged
1712
          //create the segments to be merged
1707
          List<Segment<K,V>> segmentList =
1713
          List<Segment<K,V>> segmentList =
1708
            new ArrayList<Segment<K, V>>(numSpills);
1714
            new ArrayList<Segment<K, V>>(numSpills);
1709
          for(int i = 0; i < numSpills; i++) {
1715
          for(int i = 0; i < numSpills; i++) {
1710
            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
1716
            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
1711

    
   
1717

   
1712
            Segment<K,V> s =
1718
            Segment<K,V> s =
1713
              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
1719
              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
1714
                               indexRecord.partLength, codec, true);
1720
                               indexRecord.partLength, codec, true);
1715
            segmentList.add(i, s);
1721
            segmentList.add(i, s);
1716

    
   
1722

   
1717
            if (LOG.isDebugEnabled()) {
1723
            if (LOG.isDebugEnabled()) {
1718
              LOG.debug("MapId=" + mapId + " Reducer=" + parts +
1724
              LOG.debug("MapId=" + mapId + " Reducer=" + parts +
1719
                  "Spill =" + i + "(" + indexRecord.startOffset + "," +
1725
                  "Spill =" + i + "(" + indexRecord.startOffset + "," +
1720
                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");
1726
                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");
1721
            }
1727
            }
1722
          }
1728
          }
1723

    
   
1729

   
1724
          int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);
1730
          int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);
1725
          // sort the segments only if there are intermediate merges
1731
          // sort the segments only if there are intermediate merges
1726
          boolean sortSegments = segmentList.size() > mergeFactor;
1732
          boolean sortSegments = segmentList.size() > mergeFactor;
1727
          //merge
1733
          //merge
1728
          @SuppressWarnings("unchecked")
1734
          @SuppressWarnings("unchecked")
1729
          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
1735
          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
1730
                         keyClass, valClass, codec,
1736
                         keyClass, valClass, codec,
1731
                         segmentList, mergeFactor,
1737
                         segmentList, mergeFactor,
1732
                         new Path(mapId.toString()),
1738
                         new Path(mapId.toString()),
1733
                         job.getOutputKeyComparator(), reporter, sortSegments,
1739
                         job.getOutputKeyComparator(), reporter, sortSegments,
1734
                         null, spilledRecordsCounter, sortPhase.phase());
1740
                         null, spilledRecordsCounter, sortPhase.phase());
1735

    
   
1741

   
1736
          //write merged output to disk
1742
          //write merged output to disk
1737
          long segmentStart = finalOut.getPos();
1743
          long segmentStart = finalOut.getPos();
1738
          Writer<K, V> writer =
1744
          Writer<K, V> writer =
1739
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
1745
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
1740
                               spilledRecordsCounter);
1746
                               spilledRecordsCounter);
1741
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
1747
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
1742
            Merger.writeFile(kvIter, writer, reporter, job);
1748
            Merger.writeFile(kvIter, writer, reporter, job);
1743
          } else {
1749
          } else {
1744
            combineCollector.setWriter(writer);
1750
            combineCollector.setWriter(writer);
1745
            combinerRunner.combine(kvIter, combineCollector);
1751
            combinerRunner.combine(kvIter, combineCollector);
1746
          }
1752
          }
1747

    
   
1753

   
1748
          //close
1754
          //close
1749
          writer.close();
1755
          writer.close();
1750

    
   
1756

   
1751
          sortPhase.startNextPhase();
1757
          sortPhase.startNextPhase();
1752
          
1758
          
1753
          // record offsets
1759
          // record offsets
1754
          rec.startOffset = segmentStart;
1760
          rec.startOffset = segmentStart;
1755
          rec.rawLength = writer.getRawLength();
1761
          rec.rawLength = writer.getRawLength();
1756
          rec.partLength = writer.getCompressedLength();
1762
          rec.partLength = writer.getCompressedLength();
1757
          spillRec.putIndex(rec, parts);
1763
          spillRec.putIndex(rec, parts);
1758
        }
1764
        }
1759
        spillRec.writeToFile(finalIndexFile, job);
1765
        spillRec.writeToFile(finalIndexFile, job);
1760
        finalOut.close();
1766
        finalOut.close();
1761
        for(int i = 0; i < numSpills; i++) {
1767
        for(int i = 0; i < numSpills; i++) {
1762
          rfs.delete(filename[i],true);
1768
          rfs.delete(filename[i],true);
1763
        }
1769
        }
1764
      }
1770
      }
1765
    }
1771
    }
1766

    
   
1772

   
1767
  } // MapOutputBuffer
1773
  } // MapOutputBuffer
1768
  
1774
  
1769
  /**
1775
  /**
1770
   * Exception indicating that the allocated sort buffer is insufficient
1776
   * Exception indicating that the allocated sort buffer is insufficient
1771
   * to hold the current record.
1777
   * to hold the current record.
1772
   */
1778
   */
1773
  @SuppressWarnings("serial")
1779
  @SuppressWarnings("serial")
1774
  private static class MapBufferTooSmallException extends IOException {
1780
  private static class MapBufferTooSmallException extends IOException {
1775
    public MapBufferTooSmallException(String s) {
1781
    public MapBufferTooSmallException(String s) {
1776
      super(s);
1782
      super(s);
1777
    }
1783
    }
1778
  }
1784
  }
1779

    
   
1785

   
1780
}
1786
}
src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
Revision 751d528 New Change
 
src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java
Revision 5fa329a New Change
 
  1. src/java/org/apache/hadoop/mapred/MapTask.java: Loading...
  2. src/java/org/apache/hadoop/mapreduce/JobSubmitter.java: Loading...
  3. src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java: Loading...