Review Board 1.7.22


SQOOP-604 Easy throttling feature for MySQL exports

Review Request #7135 - Created Sept. 17, 2012 and submitted

Zoltán Tóth-Czifra
Reviewers
Sqoop
sqoop-trunk
Code review for SQOOP-604, see https://issues.apache.org/jira/browse/SQOOP-604

The solution in short: Using the already existing "checkpoint" feature of the direct (--direct) MySQL exports (the export process is restarted every X bytes written), extending it with a new config value that would simply make the thread sleep for X milliseconds at the checkbpoints. With low enough byte count limit this can be a simple yet powerful throttling mechanism.
Executing with different settings of sqoop.mysql.export.checkpoint.bytes and sqoop.mysql.export.sleep.ms:

33554432B / 0ms: Transferred 4.7579 MB in 8.7175 seconds (558.8826 KB/sec)
102400B / 500ms: Transferred 4.7579 MB in 35.7794 seconds (136.1698 KB/sec)
51200B / 500ms: Transferred 4.758 MB in 57.8675 seconds (84.1959 KB/sec)
51200B / 250ms: Transferred 4.7579 MB in 35.0293 seconds (139.0854 KB/sec)

I did not add unit tests yet and as it involves calling to Thread.sleep, I find testing this difficult. Unfortunately there is no "machine" or "environment" object that could be injected to these classes as mocks that could take care of time-related fixtures.

Diff revision 6 (Latest)

1 2 3 4 5 6
1 2 3 4 5 6

  1. src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java: Loading...
src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
Revision a4e8b88 New Change
[20] 61 lines
[+20] [+] public class MySQLExportMapper<KEYIN, VALIN>
62
  public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
62
  public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
63

    
   
63

   
64
  // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
64
  // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
65
  protected long checkpointDistInBytes;
65
  protected long checkpointDistInBytes;
66

    
   
66

   

    
   
67
  /** Configuration key that specifies the number of milliseconds

    
   
68
   * to sleep at the end of each checkpoint commit

    
   
69
   * Default is 0, no sleep.

    
   
70
   */

    
   
71
  public static final String MYSQL_CHECKPOINT_SLEEP_KEY =

    
   
72
      "sqoop.mysql.export.sleep.ms";

    
   
73

   

    
   
74
  public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;

    
   
75

   

    
   
76
  // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.

    
   
77
  protected long checkpointSleepMs;

    
   
78

   
67
  protected Configuration conf;
79
  protected Configuration conf;
68

    
   
80

   
69
  /** The FIFO being used to communicate with mysqlimport. */
81
  /** The FIFO being used to communicate with mysqlimport. */
70
  protected File fifoFile;
82
  protected File fifoFile;
71

    
   
83

   
[+20] [20] 240 lines
[+20] [+] protected void setup(Context context) {
312
        MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
324
        MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
313
    if (this.checkpointDistInBytes < 0) {
325
    if (this.checkpointDistInBytes < 0) {
314
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
326
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
315
      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
327
      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
316
    }
328
    }

    
   
329

   

    
   
330
    this.checkpointSleepMs = conf.getLong(

    
   
331
        MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);

    
   
332

   

    
   
333
    if (this.checkpointSleepMs < 0) {

    
   
334
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);

    
   
335
      this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;

    
   
336
    }

    
   
337

   

    
   
338
    if (this.checkpointSleepMs >= conf.getLong("mapred.task.timeout", 0)) {

    
   
339
      LOG.warn("Value for "

    
   
340
          + MYSQL_CHECKPOINT_SLEEP_KEY

    
   
341
          + " has to be smaller than mapred.task.timeout");

    
   
342
      this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;

    
   
343
    }
317
  }
344
  }
318

    
   
345

   
319
  /**
346
  /**
320
   * Takes a delimited text record (e.g., the output of a 'Text' object),
347
   * Takes a delimited text record (e.g., the output of a 'Text' object),
321
   * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
348
   * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
[+20] [20] 23 lines
[+20] [+] protected void writeRecord(String record, String terminator)
345
    // If bytesWritten is too big, then we should start a new tx by closing
372
    // If bytesWritten is too big, then we should start a new tx by closing
346
    // mysqlimport and opening a new instance of the process.
373
    // mysqlimport and opening a new instance of the process.
347
    if (this.checkpointDistInBytes != 0
374
    if (this.checkpointDistInBytes != 0
348
        && this.bytesWritten > this.checkpointDistInBytes) {
375
        && this.bytesWritten > this.checkpointDistInBytes) {
349
      LOG.info("Checkpointing current export.");
376
      LOG.info("Checkpointing current export.");

    
   
377

   

    
   
378
      if (this.checkpointSleepMs != 0) {

    
   
379
        LOG.info("Pausing.");

    
   
380
        Thread.sleep(this.checkpointSleepMs);

    
   
381
      }

    
   
382

   
350
      closeExportHandles();
383
      closeExportHandles();
351
      initMySQLImportProcess();
384
      initMySQLImportProcess();
352
      this.bytesWritten = 0;
385
      this.bytesWritten = 0;
353
    }
386
    }
354
  }
387
  }
355
}
388
}
356

    
   
389

   
  1. src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java: Loading...