Review Board 1.7.22


SQOOP-738. Sqoop is not importing all data in Sqoop 2

Review Request #8542 - Created Dec. 12, 2012 and updated

Hari Shreedharan
SQOOP-738
Reviewers
Sqoop
sqoop-sqoop2
Threading updates to make sure close returns only after the consumer thread completes, so that any asynchronous close/flush complete.

 
execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
Revision 71b4724 New Change
[20] 16 lines
[+20]
17
 */
17
 */
18

    
   
18

   
19
package org.apache.sqoop.job.mr;
19
package org.apache.sqoop.job.mr;
20

    
   
20

   
21
import com.google.common.base.Throwables;
21
import com.google.common.base.Throwables;
22
import java.util.concurrent.ExecutionException;
22

   
23
import java.util.concurrent.Executors;
23
import java.io.IOException;
24
import java.util.concurrent.Future;
24
import java.util.concurrent.*;
25
import java.util.concurrent.Semaphore;
25

   

    
   
26
import com.google.common.util.concurrent.ThreadFactoryBuilder;
26
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.LogFactory;
28
import org.apache.commons.logging.LogFactory;
28
import org.apache.hadoop.conf.Configuration;
29
import org.apache.hadoop.conf.Configuration;
29
import org.apache.hadoop.io.NullWritable;
30
import org.apache.hadoop.io.NullWritable;
30
import org.apache.hadoop.mapreduce.JobContext;
31
import org.apache.hadoop.mapreduce.JobContext;
[+20] [20] 27 lines
[+20] [+] public class SqoopOutputFormatLoadExecutor {
58
    context = jobctx;
59
    context = jobctx;
59
    producer = new SqoopRecordWriter();
60
    producer = new SqoopRecordWriter();
60
  }
61
  }
61

    
   
62

   
62
  public RecordWriter<Data, NullWritable> getRecordWriter() {
63
  public RecordWriter<Data, NullWritable> getRecordWriter() {
63
    consumerFuture = Executors.newSingleThreadExecutor().submit(
64
    consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat

    
   
65
        ("OutputFormatLoader-consumer").build()).submit(
64
            new ConsumerThread());
66
            new ConsumerThread());
65
    return producer;
67
    return producer;
66
  }
68
  }
67

    
   
69

   
68
  /*
70
  /*
69
   * This is a producer-consumer problem and can be solved
71
   * This is a producer-consumer problem and can be solved
70
   * with two semaphores.
72
   * with two semaphores.
71
   */
73
   */
72
  private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
74
  private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
73

    
   
75

   
74
    @Override
76
    @Override
75
    public void write(Data key, NullWritable value) throws InterruptedException {
77
    public void write(Data key, NullWritable value) throws InterruptedException {
76
      checkConsumerCompletion();
78
      checkIfConsumerThrew();
77
      free.acquire();
79
      free.acquire();
78
      int type = key.getType();
80
      int type = key.getType();
79
      data.setContent(key.getContent(type), type);
81
      data.setContent(key.getContent(type), type);
80
      filled.release();
82
      filled.release();
81
    }
83
    }
82

    
   
84

   
83
    @Override
85
    @Override
84
    public void close(TaskAttemptContext context) throws InterruptedException {
86
    public void close(TaskAttemptContext context)
85
      checkConsumerCompletion();
87
            throws InterruptedException, IOException {
86
      free.acquire();
88
      free.acquire();
87
      writerFinished = true;
89
      writerFinished = true;
88
      // This will interrupt only the acquire call in the consumer class,
90
      filled.release();
89
      // since we have acquired the free semaphore, and close is called from
91
      waitForConsumer();
90
      // the same thread that writes - so filled has not been released since then

   
91
      // so the consumer is definitely blocked on the filled semaphore.

   
92
      consumerFuture.cancel(true);

   
93
    }
92
    }
94
  }
93
  }
95

    
   
94

   

    
   
95
  private void checkIfConsumerThrew() {

    
   
96
    if(readerFinished) {

    
   
97
      waitForConsumer();

    
   
98
    }

    
   
99
  }
96
  /**
100
  /**
97
   * This method checks if the reader thread has finished, and re-throw
101
   * This method checks if the reader thread has finished, and re-throw
98
   * any exceptions thrown by the reader thread.
102
   * any exceptions thrown by the reader thread.
99
   *
103
   *
100
   * @throws SqoopException if the consumer thread threw it.
104
   * @throws SqoopException if the consumer thread threw it.
101
   * @throws RuntimeException if some other exception was thrown.
105
   * @throws RuntimeException if some other exception was thrown.
102
   */
106
   */
103
  private void checkConsumerCompletion() {
107
  private void waitForConsumer() {
104
    if (readerFinished) {

   
105
      try {
108
    try {
106
        consumerFuture.get();
109
      consumerFuture.get();
107
      } catch (ExecutionException ex) {
110
    } catch (ExecutionException ex) {
108
        // In almost all cases, the exception will be SqoopException,
111
      // In almost all cases, the exception will be SqoopException,
109
        // because all exceptions are caught and propagated as
112
      // because all exceptions are caught and propagated as
110
        // SqoopExceptions
113
      // SqoopExceptions
111
        Throwable t = ex.getCause();
114
      Throwable t = ex.getCause();
112
        if(t instanceof SqoopException) {
115
      if (t instanceof SqoopException) {
113
          throw (SqoopException)t;
116
        throw (SqoopException) t;
114
        }
117
      }
115
        //In the rare case, it was not a SqoopException
118
      //In the rare case, it was not a SqoopException
116
        Throwables.propagate(t);
119
      Throwables.propagate(t);
117
      } catch (Exception ex) {
120
    } catch (Exception ex) {
118
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex);
121
      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex);
119
      }
122
    }
120
    }
123
  }
121
  }

   
122

    
   
124

   
123
  private class OutputFormatDataReader extends DataReader {
125
  private class OutputFormatDataReader extends DataReader {
124
    @Override
126
    @Override
125
    public void setFieldDelimiter(char fieldDelimiter) {
127
    public void setFieldDelimiter(char fieldDelimiter) {
126
      data.setFieldDelimiter(fieldDelimiter);
128
      data.setFieldDelimiter(fieldDelimiter);
[+20] [20] 11 lines
[+20] [+] public String readCsvRecord() throws InterruptedException {
138

    
   
140

   
139
    @Override
141
    @Override
140
    public Object readContent(int type) throws InterruptedException {
142
    public Object readContent(int type) throws InterruptedException {
141
      // Has any more data been produced after I last consumed.
143
      // Has any more data been produced after I last consumed.
142
      // If no, wait for the producer to produce.
144
      // If no, wait for the producer to produce.
143
      if (writerFinished && (filled.availablePermits() == 0)) {

   
144
        return null;
Moved to 155

   
145
      }
Moved to 156

   
146
      try {
145
      try {
147
        filled.acquire();
146
        filled.acquire();
148
      } catch (InterruptedException ex) {
147
      } catch (InterruptedException ex) {
149
        if(writerFinished) {
148
        //Really at this point, there is nothing to do. Just throw and get out
150
          return null;
149
        LOG.error("Interrupted while waiting for data to be available from " +
151
        }
150
            "mapper", ex);
152
        throw ex;
151
        throw ex;
153
      }
152
      }

    
   
153
      // If the writer has finished, there is definitely no data remaining

    
   
154
      if (writerFinished) {
Moved from 144

    
   
155
        return null;
Moved from 145

    
   
156
      }
154
      Object content = data.getContent(type);
157
      Object content = data.getContent(type);
155
      free.release();
158
      free.release();
156
      return content;
159
      return content;
157
    }
160
    }
158
  }
161
  }
[+20] [20] 25 lines
[+20] [+] public void run() {
184
          subContext = new PrefixContext(conf, "");
187
          subContext = new PrefixContext(conf, "");
185
          configConnection = ConfigurationUtils.getFrameworkConnection(conf);
188
          configConnection = ConfigurationUtils.getFrameworkConnection(conf);
186
          configJob = ConfigurationUtils.getFrameworkJob(conf);
189
          configJob = ConfigurationUtils.getFrameworkJob(conf);
187
          break;
190
          break;
188
        default:
191
        default:

    
   
192
          readerFinished = true;
189
          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
193
          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
190
      }
194
      }
191

    
   
195

   
192
      try {
196
      try {
193
        loader.load(subContext, configConnection, configJob, reader);
197
        loader.load(subContext, configConnection, configJob, reader);
194
      } catch (Throwable t) {
198
      } catch (Throwable t) {

    
   
199
        readerFinished = true;
195
        LOG.error("Error while loading data out of MR job.", t);
200
        LOG.error("Error while loading data out of MR job.", t);
196
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
201
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
197
      }
202
      }
198

    
   
203

   
199
      // if no exception happens yet and reader finished before writer,
204
      // if no exception happens yet and reader finished before writer,
200
      // something went wrong
205
      // something went wrong
201
      if (!writerFinished) {
206
      if (!writerFinished) {
202
        // throw exception if data are not all consumed
207
        // throw exception if data are not all consumed

    
   
208
        readerFinished = true;
203
        LOG.error("Reader terminated, but writer is still running!");
209
        LOG.error("Reader terminated, but writer is still running!");
204
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
210
        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
205

    
   
211

   
206
      }
212
      }
207
      // inform writer that reader is finished
213
      // inform writer that reader is finished
208
      readerFinished = true;
214
      readerFinished = true;
209
    }
215
    }
210
  }
216
  }
211
}
217
}
execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
Revision 9edf0ba New Change
 
  1. execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java: Loading...
  2. execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java: Loading...