Review Board 1.7.22


SQOOP-863 Sqoop2: Introduce ProgressThread into Extractor and Loader

Review Request #9845 - Created March 11, 2013 and submitted

Jarek Cecho
SQOOP-863
Reviewers
Sqoop
sqoop-sqoop2
I've ported ProgressThread from Sqoop1 and plug it into SqoopMapper and SqoopReducer.
Unit and integration tests seems to be passing. I've also verified the functionality on real cluster.
execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressThread.java
New File

    
   
1
/**

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing, software

    
   
13
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
15
 * See the License for the specific language governing permissions and

    
   
16
 * limitations under the License.

    
   
17
 */

    
   
18
package org.apache.sqoop.job.mr;

    
   
19

   

    
   
20
import org.apache.commons.logging.Log;

    
   
21
import org.apache.commons.logging.LogFactory;

    
   
22
import org.apache.hadoop.conf.Configuration;

    
   
23
import org.apache.hadoop.mapreduce.TaskInputOutputContext;

    
   
24

   

    
   
25

   

    
   
26
/**

    
   
27
  * Run the task process for auto-progress tasks.

    
   
28
  */

    
   
29
public class ProgressThread extends Thread {

    
   
30

   

    
   
31
  public static final Log LOG = LogFactory.getLog(ProgressThread.class);

    
   
32

   

    
   
33
  /**

    
   
34
   * Total number of millis for which progress will be reported by the

    
   
35
   * auto-progress thread. If this is zero, then the auto-progress thread will

    
   
36
   * never voluntarily exit.

    
   
37
   */

    
   
38
  private int maxProgressPeriod;

    
   
39

   

    
   
40
  /**

    
   
41
   * Number of milliseconds to sleep for between loop iterations. Must be less

    
   
42
   * than report interval.

    
   
43
   */

    
   
44
  private int sleepInterval;

    
   
45

   

    
   
46
  /**

    
   
47
   * Number of milliseconds between calls to Reporter.progress().

    
   
48
   * Should be a multiple of the sleepInterval.

    
   
49
   */

    
   
50
  private int reportInterval;

    
   
51

   

    
   
52
  public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max";

    
   
53
  public static final String SLEEP_INTERVAL_KEY = "sqoop.mapred.auto.progress.sleep";

    
   
54
  public static final String REPORT_INTERVAL_KEY = "sqoop.mapred.auto.progress.report";

    
   
55

   

    
   
56
  // Sleep for 10 seconds at a time.

    
   
57
  public static final int DEFAULT_SLEEP_INTERVAL = 10000;

    
   
58

   

    
   
59
  // Report progress every 30 seconds.

    
   
60
  public static final int DEFAULT_REPORT_INTERVAL = 30000;

    
   
61

   

    
   
62
  // Disable max progress, by default.

    
   
63
  public static final int DEFAULT_MAX_PROGRESS = 0;

    
   
64

   

    
   
65
  private volatile boolean keepGoing; // While this is true, thread runs.

    
   
66

   

    
   
67
  private TaskInputOutputContext context;

    
   
68
  private long startTimeMillis;

    
   
69
  private long lastReportMillis;

    
   
70

   

    
   
71
  public ProgressThread(final TaskInputOutputContext ctxt) {

    
   
72
    this.context = ctxt;

    
   
73
    this.keepGoing = true;

    
   
74
    configureAutoProgress(ctxt.getConfiguration());

    
   
75
  }

    
   
76

   

    
   
77
  /**

    
   
78
   * Set configuration parameters for the auto-progress thread.

    
   
79
   */

    
   
80
  private void configureAutoProgress(Configuration job) {

    
   
81
    this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, DEFAULT_MAX_PROGRESS);

    
   
82
    this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);

    
   
83
    this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, DEFAULT_REPORT_INTERVAL);

    
   
84

   

    
   
85
    if (this.reportInterval < 1) {

    
   
86
      LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + DEFAULT_REPORT_INTERVAL);

    
   
87
      this.reportInterval = DEFAULT_REPORT_INTERVAL;

    
   
88
    }

    
   
89

   

    
   
90
    if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {

    
   
91
      LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + DEFAULT_SLEEP_INTERVAL);

    
   
92
      this.sleepInterval = DEFAULT_SLEEP_INTERVAL;

    
   
93
    }

    
   
94

   

    
   
95
    if (this.maxProgressPeriod < 0) {

    
   
96
      LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + DEFAULT_MAX_PROGRESS);

    
   
97
      this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;

    
   
98
    }

    
   
99
  }

    
   
100

   

    
   
101
  public void signalShutdown() {

    
   
102
    this.keepGoing = false; // volatile update.

    
   
103
    this.interrupt();

    
   
104
  }

    
   
105

   

    
   
106
  public void run() {

    
   
107
    this.lastReportMillis = System.currentTimeMillis();

    
   
108
    this.startTimeMillis = this.lastReportMillis;

    
   
109

   

    
   
110
    final long MAX_PROGRESS = this.maxProgressPeriod;

    
   
111
    final long REPORT_INTERVAL = this.reportInterval;

    
   
112
    final long SLEEP_INTERVAL = this.sleepInterval;

    
   
113

   

    
   
114
    // In a loop:

    
   
115
    //   * Check that we haven't run for too long (maxProgressPeriod).

    
   
116
    //   * If it's been a report interval since we last made progress,

    
   
117
    //     make more.

    
   
118
    //   * Sleep for a bit.

    
   
119
    //   * If the parent thread has signaled for exit, do so.

    
   
120
    while (this.keepGoing) {

    
   
121
      long curTimeMillis = System.currentTimeMillis();

    
   
122

   

    
   
123
      if (MAX_PROGRESS != 0 && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {

    
   
124
        this.keepGoing = false;

    
   
125
        LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + " ms.");

    
   
126
        break;

    
   
127
      }

    
   
128

   

    
   
129
      if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {

    
   
130
        // It's been a full report interval -- claim progress.

    
   
131
        LOG.debug("Auto-progress thread reporting progress");

    
   
132
        this.context.progress();

    
   
133
        this.lastReportMillis = curTimeMillis;

    
   
134
      }

    
   
135

   

    
   
136
      // Unless we got an interrupt while we were working,

    
   
137
      // sleep a bit before doing more work.

    
   
138
      if (!Thread.interrupted()) {

    
   
139
        try {

    
   
140
          Thread.sleep(SLEEP_INTERVAL);

    
   
141
        } catch (InterruptedException ie) {

    
   
142
          // we were notified on something; not necessarily an error.

    
   
143
        }

    
   
144
      }

    
   
145
    }

    
   
146
    LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);

    
   
147
  }

    
   
148
}
execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
Revision 2a823032cfd91fb8fd1a8aee3ffd9d80c2e3bae6 New Change
 
execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
Revision d2361482771eea1a34b14c767952c5592f89c45a New Change
 
  1. execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressThread.java: Loading...
  2. execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java: Loading...
  3. execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java: Loading...