Review Board 1.7.22


SQOOP-604 Easy throttling feature for MySQL exports

Review Request #7135 - Created Sept. 17, 2012 and submitted

Zoltán Tóth-Czifra
Reviewers
Sqoop
sqoop-trunk
Code review for SQOOP-604, see https://issues.apache.org/jira/browse/SQOOP-604

The solution in short: Using the already existing "checkpoint" feature of the direct (--direct) MySQL exports (the export process is restarted every X bytes written), extending it with a new config value that would simply make the thread sleep for X milliseconds at the checkbpoints. With low enough byte count limit this can be a simple yet powerful throttling mechanism.
Executing with different settings of sqoop.mysql.export.checkpoint.bytes and sqoop.mysql.export.sleep.ms:

33554432B / 0ms: Transferred 4.7579 MB in 8.7175 seconds (558.8826 KB/sec)
102400B / 500ms: Transferred 4.7579 MB in 35.7794 seconds (136.1698 KB/sec)
51200B / 500ms: Transferred 4.758 MB in 57.8675 seconds (84.1959 KB/sec)
51200B / 250ms: Transferred 4.7579 MB in 35.0293 seconds (139.0854 KB/sec)

I did not add unit tests yet and as it involves calling to Thread.sleep, I find testing this difficult. Unfortunately there is no "machine" or "environment" object that could be injected to these classes as mocks that could take care of time-related fixtures.

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 6. See what's changed.

1 2 3 4 5 6
1 2 3 4 5 6

  1. src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java: Loading...
src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
Revision a4e8b88 New Change
[20] 61 lines
[+20] [+] public class MySQLExportMapper<KEYIN, VALIN>
62
  public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
62
  public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
63

    
   
63

   
64
  // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
64
  // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
65
  protected long checkpointDistInBytes;
65
  protected long checkpointDistInBytes;
66

    
   
66

   

    
   
67
  /** Configuration key that specifies the number of milliseconds

    
   
68
   * to sleep at the end of each checkpoint commit

    
   
69
   * Default is 0, no sleep.

    
   
70
   */

    
   
71
  public static final String MYSQL_CHECKPOINT_SLEEP_KEY =

    
   
72
      "sqoop.mysql.export.sleep.ms";

    
   
73

   

    
   
74
  public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;

    
   
75

   

    
   
76
  // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.

    
   
77
  protected long checkpointSleepMs;

    
   
78

   
67
  protected Configuration conf;
79
  protected Configuration conf;
68

    
   
80

   
69
  /** The FIFO being used to communicate with mysqlimport. */
81
  /** The FIFO being used to communicate with mysqlimport. */
70
  protected File fifoFile;
82
  protected File fifoFile;
71

    
   
83

   
[+20] [20] 240 lines
[+20] [+] protected void setup(Context context) {
312
        MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
324
        MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
313
    if (this.checkpointDistInBytes < 0) {
325
    if (this.checkpointDistInBytes < 0) {
314
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
326
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
315
      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
327
      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
316
    }
328
    }

    
   
329

   

    
   
330
    this.checkpointSleepMs = conf.getLong(

    
   
331
        MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);

    
   
332
    if (this.checkpointDistInBytes < 0) {

    
   
333
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);

    
   
334
      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_SLEEP_MS;

    
   
335
    }
317
  }
336
  }
318

    
   
337

   
319
  /**
338
  /**
320
   * Takes a delimited text record (e.g., the output of a 'Text' object),
339
   * Takes a delimited text record (e.g., the output of a 'Text' object),
321
   * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
340
   * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
[+20] [20] 23 lines
[+20] [+] protected void writeRecord(String record, String terminator)
345
    // If bytesWritten is too big, then we should start a new tx by closing
364
    // If bytesWritten is too big, then we should start a new tx by closing
346
    // mysqlimport and opening a new instance of the process.
365
    // mysqlimport and opening a new instance of the process.
347
    if (this.checkpointDistInBytes != 0
366
    if (this.checkpointDistInBytes != 0
348
        && this.bytesWritten > this.checkpointDistInBytes) {
367
        && this.bytesWritten > this.checkpointDistInBytes) {
349
      LOG.info("Checkpointing current export.");
368
      LOG.info("Checkpointing current export.");

    
   
369

   

    
   
370
      if(this.checkpointSleepMs != 0) {

    
   
371
        LOG.info("Pausing.");

    
   
372
        Thread.sleep(this.checkpointSleepMs);

    
   
373
      }

    
   
374

   
350
      closeExportHandles();
375
      closeExportHandles();
351
      initMySQLImportProcess();
376
      initMySQLImportProcess();
352
      this.bytesWritten = 0;
377
      this.bytesWritten = 0;
353
    }
378
    }
354
  }
379
  }
355
}
380
}
356

    
   
381

   
  1. src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java: Loading...