Review Board 1.7.22


Fix for SQOOP-1013

Review Request #11537 - Created May 30, 2013 and updated

Venkat Ranganathan
Reviewers
Sqoop
sqoop-sqoop2
This addresses Boolean, date, time, and timestamp splitters.

THis also disallows char type splitters as discussed in SQOOP-976
Introduced new unit tests to test new functionality
All tests pass
connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
Diff Revision 2 Diff Revision 4
[20] 86 lines
[+20] [+] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
87
    case Types.TIME:
87
    case Types.TIME:
88
    case Types.TIMESTAMP:
88
    case Types.TIMESTAMP:
89
      // Date time column
89
      // Date time column
90
      return partitionDateTimeColumn();
90
      return partitionDateTimeColumn();
91

    
   
91

   
92
      // CHAR/VARCHAR/LONGVARCHAR columns are not supported for splitting

   
93
      // See SQOOP-976

   
94
    case Types.CHAR:
92
    case Types.CHAR:
95
    case Types.VARCHAR:
93
    case Types.VARCHAR:
96
    case Types.LONGVARCHAR:
94
    case Types.LONGVARCHAR:
97
      // Text column
95
      // Text column

    
   
96
      return partitionTextColumn();

    
   
97

   
98
    default:
98
    default:
99
      throw new SqoopException(
99
      throw new SqoopException(
100
          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
100
          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
101
          String.valueOf(partitionColumnType));
101
          String.valueOf(partitionColumnType));
102
    }
102
    }
[+20] [20] 74 lines
[+20] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
177
        constructDateConditions(objLB, objUB, true));
177
        constructDateConditions(objLB, objUB, true));
178
    partitions.add(partition);
178
    partitions.add(partition);
179
    return partitions;
179
    return partitions;
180
  }
180
  }
181

    
   
181

   

    
   
182
  protected List<Partition> partitionTextColumn() {

    
   
183
    List<Partition> partitions = new LinkedList<Partition>();

    
   
184

   

    
   
185
    String minStringValue = null;

    
   
186
    String maxStringValue = null;

    
   
187

   

    
   
188
    // Remove common prefix if any as it does not affect outcome.

    
   
189
    int maxPrefixLen = Math.min(partitionMinValue.length(),

    
   
190
        partitionMaxValue.length());

    
   
191
    // Calculate common prefix length

    
   
192
    int cpLen = 0;

    
   
193

   

    
   
194
    for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {

    
   
195
      char c1 = partitionMinValue.charAt(cpLen);

    
   
196
      char c2 = partitionMaxValue.charAt(cpLen);

    
   
197
      if (c1 != c2) {

    
   
198
        break;

    
   
199
      }

    
   
200
    }

    
   
201

   

    
   
202
    // The common prefix has length 'sharedLen'. Extract it from both.

    
   
203
    String prefix = partitionMinValue.substring(0, cpLen);

    
   
204
    minStringValue = partitionMinValue.substring(cpLen);

    
   
205
    maxStringValue = partitionMaxValue.substring(cpLen);

    
   
206

   

    
   
207
    BigDecimal minStringBD = textToBigDecimal(minStringValue);

    
   
208
    BigDecimal maxStringBD = textToBigDecimal(maxStringValue);

    
   
209

   

    
   
210
    // Having one single value means that we can create only one single split

    
   
211
    if(minStringBD.equals(maxStringBD)) {

    
   
212
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();

    
   
213
      partition.setConditions(constructTextConditions(prefix, maxStringBD));

    
   
214
      partitions.add(partition);

    
   
215
      return partitions;

    
   
216
    }

    
   
217

   

    
   
218
    // Get all the split points together.

    
   
219
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();

    
   
220

   

    
   
221
    BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),

    
   
222
        new BigDecimal(numberPartitions));

    
   
223
    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {

    
   
224
      splitSize = NUMERIC_MIN_INCREMENT;

    
   
225
    }

    
   
226

   

    
   
227
    BigDecimal curVal = minStringBD;

    
   
228

   

    
   
229
    while (curVal.compareTo(maxStringBD) <= 0) {

    
   
230
      splitPoints.add(curVal);

    
   
231
      curVal = curVal.add(splitSize);

    
   
232
    }

    
   
233

   

    
   
234
    if (splitPoints.size() == 0

    
   
235
        || splitPoints.get(0).compareTo(minStringBD) != 0) {

    
   
236
      splitPoints.add(0, minStringBD);

    
   
237
    }

    
   
238

   

    
   
239
    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0

    
   
240
        || splitPoints.size() == 1) {

    
   
241
      splitPoints.add(maxStringBD);

    
   
242
    }

    
   
243

   

    
   
244
    // Turn the split points into a set of string intervals.

    
   
245
    BigDecimal start = splitPoints.get(0);

    
   
246
    for (int i = 1; i < splitPoints.size(); i++) {

    
   
247
      BigDecimal end = splitPoints.get(i);

    
   
248

   

    
   
249
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();

    
   
250
      partition.setConditions(constructTextConditions(prefix, start,

    
   
251
          end, i == splitPoints.size() - 1));

    
   
252
      partitions.add(partition);

    
   
253

   

    
   
254
      start = end;

    
   
255
    }

    
   
256

   

    
   
257
    return partitions;

    
   
258
  }

    
   
259

   

    
   
260

   
182
  protected List<Partition> partitionIntegerColumn() {
261
  protected List<Partition> partitionIntegerColumn() {
183
    List<Partition> partitions = new LinkedList<Partition>();
262
    List<Partition> partitions = new LinkedList<Partition>();
184

    
   
263

   
185
    long minValue = partitionMinValue == null ? Long.MIN_VALUE
264
    long minValue = partitionMinValue == null ? Long.MIN_VALUE
186
      : Long.parseLong(partitionMinValue);
265
      : Long.parseLong(partitionMinValue);
[+20] [20] 77 lines
[+20] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
264

    
   
343

   
265
    // Get all the split points together.
344
    // Get all the split points together.
266
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
345
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
267

    
   
346

   
268
    BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
347
    BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));

    
   
348

   
269
    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
349
    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
270
      splitSize = NUMERIC_MIN_INCREMENT;
350
      splitSize = NUMERIC_MIN_INCREMENT;
271
    }
351
    }
272

    
   
352

   
273
    BigDecimal curVal = minValue;
353
    BigDecimal curVal = minValue;
[+20] [20] 118 lines
[+20] [+] protected String constructDateConditions(
392
    conditions.append(lastOne ? " <= " : " < ");
472
    conditions.append(lastOne ? " <= " : " < ");
393
    conditions.append('\'').append(upperBound.toString()).append('\'');
473
    conditions.append('\'').append(upperBound.toString()).append('\'');
394
    return conditions.toString();
474
    return conditions.toString();
395
  }
475
  }
396

    
   
476

   
397
  protected String constructDateConditions(Object value) {
477
  protected String constructTextConditions(String prefix,

    
   
478
      Object lowerBound, Object upperBound, boolean lastOne) {

    
   
479
    StringBuilder conditions = new StringBuilder();

    
   
480
    String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);

    
   
481
    String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);

    
   
482
    conditions.append('\'').append(lbString).append('\'');

    
   
483
    conditions.append(" <= ");

    
   
484
    conditions.append(partitionColumnName);

    
   
485
    conditions.append(" AND ");

    
   
486
    conditions.append(partitionColumnName);

    
   
487
    conditions.append(lastOne ? " <= " : " < ");

    
   
488
    conditions.append('\'').append(ubString).append('\'');

    
   
489
    return conditions.toString();

    
   
490
  }

    
   
491

   

    
   
492
  protected String constructTextConditions(String prefix, Object value) {
398
    return new StringBuilder()
493
    return new StringBuilder()
399
        .append(partitionColumnName)
494
      .append(partitionColumnName)
400
        .append(" = ")
495
      .append(" = ").append('\'')
401
        .append('\'').append(value.toString()).append('\'')
496
      .append(prefix + bigDecimalToText((BigDecimal)value))
402
        .toString();
497
      .append('\'').toString()

    
   
498
     ;

    
   
499
  }

    
   
500

   

    
   
501

   

    
   
502
  /**

    
   
503
   *  Converts a string to a BigDecimal representation in Base 2^16 format.

    
   
504
   *  Given a string 's' containing characters s_0, s_1,..s_n,

    
   
505
   *  the string is interpreted as the number: 0.s_0 s_1 s_2.. s_min(n,8)

    
   
506
   *  This can be split and each split point can be converted back to

    
   
507
   *  a string value for comparison purposes.   The number of characters

    
   
508
   *  is restricted to prevent repeating fractions and rounding errors

    
   
509
   *  towards the higher fraction positions.

    
   
510
   */

    
   
511
  private static final BigDecimal UNITS_BASE = new BigDecimal(65536);

    
   
512
  private static final int MAX_CHARS_TO_CONVERT = 4;

    
   
513

   

    
   
514
  private BigDecimal textToBigDecimal(String str) {

    
   
515
    BigDecimal result = BigDecimal.ZERO;

    
   
516
    BigDecimal divisor = UNITS_BASE;

    
   
517

   

    
   
518
    int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);

    
   
519

   

    
   
520
    for (int n = 0; n < len; ++n) {

    
   
521
      int codePoint = str.codePointAt(n);

    
   
522
      BigDecimal val = divide(new BigDecimal(codePoint), divisor);

    
   
523
      result = result.add(val);

    
   
524
      divisor = divisor.multiply(UNITS_BASE);
403
  }
525
    }

    
   
526

   

    
   
527
    return result;

    
   
528
  }

    
   
529

   

    
   
530
  private String bigDecimalToText(BigDecimal bd) {

    
   
531
    BigDecimal curVal = bd.stripTrailingZeros();

    
   
532
    StringBuilder sb = new StringBuilder();

    
   
533

   

    
   
534
    for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {

    
   
535
      curVal = curVal.multiply(UNITS_BASE);

    
   
536
      int cp = curVal.intValue();

    
   
537
      if (0 == cp) {

    
   
538
        break;

    
   
539
      }

    
   
540
      curVal = curVal.subtract(new BigDecimal(cp));

    
   
541
      sb.append(Character.toChars(cp));

    
   
542
    }

    
   
543
    return sb.toString();

    
   
544
  }

    
   
545

   
404
}
546
}
connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
Diff Revision 2 Diff Revision 4
 
  1. connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java: Loading...
  2. connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java: Loading...