Review Board 1.7.22


SQOOP-813 Sqoop2: LoaderExecutor might get into deadlock when exception is raised outside Loader itself

Review Request #8849 - Created Jan. 6, 2013 and submitted

Jarek Cecho
SQOOP-813
Reviewers
Sqoop
sqoop-sqoop2
I've extended the try-catch block to protect entire execution of LoaderExecutor.
Unit tests are passing and I've tested this on real cluster (in cracked code to trigger exception in framework code).
execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
Revision d158327 New Change
[20] 172 lines
[+20] [+] public Object readContent(int type) throws InterruptedException {
173

    
   
173

   
174
  private class ConsumerThread implements Runnable {
174
  private class ConsumerThread implements Runnable {
175

    
   
175

   
176
    @Override
176
    @Override
177
    public void run() {
177
    public void run() {

    
   
178
      LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");

    
   
179
      try {
178
      DataReader reader = new OutputFormatDataReader();
180
        DataReader reader = new OutputFormatDataReader();
179

    
   
181

   
180
      Configuration conf = null;
182
        Configuration conf = null;
181
      if (!isTest) {
183
        if (!isTest) {
182
        conf = context.getConfiguration();
184
          conf = context.getConfiguration();
183

    
   

   
184

    
   

   
185
        loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
185
          loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
186
      }
186
        }
187
      Loader loader = (Loader) ClassUtils.instantiate(loaderName);
187
        Loader loader = (Loader) ClassUtils.instantiate(loaderName);
188

    
   
188

   
189
      // Objects that should be pass to the Executor execution
189
        // Objects that should be pass to the Executor execution
[+20] [20] 12 lines
[+20] private class ConsumerThread implements Runnable {
202
            subContext = new PrefixContext(conf, "");
202
              subContext = new PrefixContext(conf, "");
203
            configConnection = ConfigurationUtils.getFrameworkConnection(conf);
203
              configConnection = ConfigurationUtils.getFrameworkConnection(conf);
204
            configJob = ConfigurationUtils.getFrameworkJob(conf);
204
              configJob = ConfigurationUtils.getFrameworkJob(conf);
205
            break;
205
              break;
206
          default:
206
            default:
207
            readerFinished = true;

   
208
            // Release so that the writer can tell the framework something went

   
209
            // wrong.

   
210
            free.release();

   
211
            throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
207
              throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
212
        }
208
          }
213
      }
209
        }
214

    
   
210

   
215
      try {

   
216
        LOG.info("Running loader class " + loaderName);
211
        LOG.info("Running loader class " + loaderName);
217
        loader.load(subContext, configConnection, configJob, reader);
212
        loader.load(subContext, configConnection, configJob, reader);
218
        LOG.info("Loader has finished");
213
        LOG.info("Loader has finished");
219
      } catch (Throwable t) {
214
      } catch (Throwable t) {
220
        readerFinished = true;
215
        readerFinished = true;
[+20] [20] 24 lines
  1. execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java: Loading...