Review Board 1.7.22


SQOOP-674. Exceptions in special map reduce threads can cause mapreduce job to freeze

Review Request #8657 - Created Dec. 18, 2012 and submitted

Hari Shreedharan
Sqoop2
SQOOP-674
Reviewers
Sqoop
sqoop-sqoop2
Release the semaphore when exceptions are thrown as well.
Added unit tests to test this and a few other cases.
execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
Revision 38e2292 New Change
[20] 72 lines
[+20] [+] public class SqoopOutputFormatLoadExecutor {
73
   */
73
   */
74
  private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
74
  private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
75

    
   
75

   
76
    @Override
76
    @Override
77
    public void write(Data key, NullWritable value) throws InterruptedException {
77
    public void write(Data key, NullWritable value) throws InterruptedException {
78
      checkIfConsumerThrew();

   
79
      free.acquire();
78
      free.acquire();

    
   
79
      checkIfConsumerThrew();
80
      int type = key.getType();
80
      int type = key.getType();
81
      data.setContent(key.getContent(type), type);
81
      data.setContent(key.getContent(type), type);
82
      filled.release();
82
      filled.release();
83
    }
83
    }
84

    
   
84

   
[+20] [20] 105 lines
[+20] [+] public void run() {
190
          configConnection = ConfigurationUtils.getFrameworkConnection(conf);
190
          configConnection = ConfigurationUtils.getFrameworkConnection(conf);
191
          configJob = ConfigurationUtils.getFrameworkJob(conf);
191
          configJob = ConfigurationUtils.getFrameworkJob(conf);
192
          break;
192
          break;
193
        default:
193
        default:
194
          readerFinished = true;
194
          readerFinished = true;

    
   
195
          // Release so that the writer can tell the framework something went

    
   
196
          // wrong.

    
   
197
          free.release();
195
          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
198
          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
196
      }
199
      }
197

    
   
200

   
198
      try {
201
      try {
199
        LOG.info("Running loader class " + loaderName);
202
        LOG.info("Running loader class " + loaderName);
200
        loader.load(subContext, configConnection, configJob, reader);
203
        loader.load(subContext, configConnection, configJob, reader);
201
        LOG.info("Loader has finished");
204
        LOG.info("Loader has finished");
202
      } catch (Throwable t) {
205
      } catch (Throwable t) {
203
        readerFinished = true;
206
        readerFinished = true;
204
        LOG.error("Error while loading data out of MR job.", t);
207
        LOG.error("Error while loading data out of MR job.", t);

    
   
208
        // Release so that the writer can tell the framework something went

    
   
209
        // wrong.

    
   
210
        free.release();
205
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
211
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
206
      }
212
      }
207

    
   
213

   
208
      // if no exception happens yet and reader finished before writer,
214
      // if no exception happens yet and reader finished before writer,
209
      // something went wrong
215
      // something went wrong
210
      if (!writerFinished) {
216
      if (!writerFinished) {
211
        // throw exception if data are not all consumed
217
        // throw exception if data are not all consumed
212
        readerFinished = true;
218
        readerFinished = true;
213
        LOG.error("Reader terminated, but writer is still running!");
219
        LOG.error("Reader terminated, but writer is still running!");

    
   
220
        // Release so that the writer can tell the framework something went

    
   
221
        // wrong.

    
   
222
        free.release();
214
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
223
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
215

    
   
224

   
216
      }
225
      }
217
      // inform writer that reader is finished
226
      // inform writer that reader is finished
218
      readerFinished = true;
227
      readerFinished = true;
219
    }
228
    }
220
  }
229
  }
221
}
230
}
execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
New File
 
  1. execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java: Loading...
  2. execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java: Loading...