Review Board 1.7.22


Fix for SQOOP-1013

Review Request #11537 - Created May 30, 2013 and updated

Venkat Ranganathan
Reviewers
Sqoop
sqoop-sqoop2
This addresses Boolean, date, time, and timestamp splitters.

THis also disallows char type splitters as discussed in SQOOP-976
Introduced new unit tests to test new functionality
All tests pass
connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
Revision f80f30d New Change
[20] 15 lines
[+20]
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18
package org.apache.sqoop.connector.jdbc;
18
package org.apache.sqoop.connector.jdbc;
19

    
   
19

   
20
import java.math.BigDecimal;
20
import java.math.BigDecimal;

    
   
21
import java.sql.Date;

    
   
22
import java.sql.Time;

    
   
23
import java.sql.Timestamp;
21
import java.sql.Types;
24
import java.sql.Types;
22
import java.util.LinkedList;
25
import java.util.LinkedList;
23
import java.util.List;
26
import java.util.List;
24

    
   
27

   
25
import org.apache.sqoop.common.SqoopException;
28
import org.apache.sqoop.common.SqoopException;
[+20] [20] 5 lines
[+20]
31

    
   
34

   
32
public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
35
public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
33

    
   
36

   
34
  private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
37
  private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
35

    
   
38

   

    
   
39

   
36
  private long numberPartitions;
40
  private long numberPartitions;
37
  private String partitionColumnName;
41
  private String partitionColumnName;
38
  private int partitionColumnType;
42
  private int partitionColumnType;
39
  private String partitionMinValue;
43
  private String partitionMinValue;
40
  private String partitionMaxValue;
44
  private String partitionMaxValue;
[+20] [20] 4 lines
[+20] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
45
    partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
49
    partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
46
    partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
50
    partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
47
    partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
51
    partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
48
    partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
52
    partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
49

    
   
53

   

    
   
54
    if (partitionMinValue == null && partitionMaxValue == null) {

    
   
55
      List<Partition> partitions = new LinkedList<Partition>();
Moved from 138

    
   
56
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
Moved from 139

    
   
57
      partition.setConditions(partitionColumnName + "IS NULL");
Moved from 140

    
   
58
      partitions.add(partition);
Moved from 141

    
   
59
      return partitions;
Moved from 142

    
   
60
    }

    
   
61

   
50
    switch (partitionColumnType) {
62
    switch (partitionColumnType) {
51
    case Types.TINYINT:
63
    case Types.TINYINT:
52
    case Types.SMALLINT:
64
    case Types.SMALLINT:
53
    case Types.INTEGER:
65
    case Types.INTEGER:
54
    case Types.BIGINT:
66
    case Types.BIGINT:
[+20] [20] 12 lines
[+20] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
67
      return partitionNumericColumn();
79
      return partitionNumericColumn();
68

    
   
80

   
69
    case Types.BIT:
81
    case Types.BIT:
70
    case Types.BOOLEAN:
82
    case Types.BOOLEAN:
71
      // Boolean column
83
      // Boolean column
72
      // TODO: Add partition function
84
      return partitionBooleanColumn();
73

    
   
85

   
74
    case Types.DATE:
86
    case Types.DATE:
75
    case Types.TIME:
87
    case Types.TIME:
76
    case Types.TIMESTAMP:
88
    case Types.TIMESTAMP:
77
      // Date time column
89
      // Date time column
78
      // TODO: Add partition function
90
      return partitionDateTimeColumn();
79

    
   
91

   

    
   
92
      // CHAR/VARCHAR/LONGVARCHAR columns are not supported for splitting

    
   
93
      // See SQOOP-976
80
    case Types.CHAR:
94
    case Types.CHAR:
81
    case Types.VARCHAR:
95
    case Types.VARCHAR:
82
    case Types.LONGVARCHAR:
96
    case Types.LONGVARCHAR:
83
      // Text column
97
      // Text column
84
      // TODO: Add partition function

   
85

    
   

   
86
    default:
98
    default:
87
      throw new SqoopException(
99
      throw new SqoopException(
88
          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
100
          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
89
          String.valueOf(partitionColumnType));
101
          String.valueOf(partitionColumnType));
90
    }
102
    }
91
  }
103
  }
92

    
   
104
  protected List<Partition> partitionDateTimeColumn() {
93
  protected List<Partition> partitionIntegerColumn() {

   
94
    List<Partition> partitions = new LinkedList<Partition>();
105
    List<Partition> partitions = new LinkedList<Partition>();
95

    
   
106

   
96
    if (partitionMinValue == null && partitionMaxValue == null) {
107
    long minDateValue = 0;

    
   
108
    long maxDateValue = 0;

    
   
109

   

    
   
110
    switch(partitionColumnType) {

    
   
111
      case Types.DATE:

    
   
112
        minDateValue = Date.valueOf(partitionMinValue).getTime();

    
   
113
        maxDateValue = Date.valueOf(partitionMaxValue).getTime();

    
   
114
        break;

    
   
115
      case Types.TIME:

    
   
116
        minDateValue = Time.valueOf(partitionMinValue).getTime();

    
   
117
        maxDateValue = Time.valueOf(partitionMaxValue).getTime();

    
   
118
        break;

    
   
119
      case Types.TIMESTAMP:

    
   
120
        minDateValue = Timestamp.valueOf(partitionMinValue).getTime();

    
   
121
        maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();

    
   
122
        break;

    
   
123
    }

    
   
124
    long interval =  (maxDateValue - minDateValue) / numberPartitions;

    
   
125
    long remainder = (maxDateValue - minDateValue) % numberPartitions;

    
   
126

   

    
   
127
    if (interval == 0) {

    
   
128
      numberPartitions = (int)remainder;

    
   
129
    }

    
   
130
    long lowerBound;

    
   
131
    long upperBound = minDateValue;

    
   
132

   

    
   
133
    Object objLB = null;

    
   
134
    Object objUB = null;

    
   
135

   

    
   
136
    for (int i = 1; i < numberPartitions; i++) {

    
   
137
      lowerBound = upperBound;

    
   
138
      upperBound = lowerBound + interval;

    
   
139
      upperBound += (i <= remainder) ? 1 : 0;

    
   
140

   

    
   
141
      switch(partitionColumnType) {

    
   
142
        case Types.DATE:

    
   
143
          objLB = new Date(lowerBound);

    
   
144
          objUB = new Date(upperBound);

    
   
145
          break;

    
   
146
        case Types.TIME:

    
   
147
          objLB = new Time(lowerBound);

    
   
148
          objUB = new Time(upperBound);

    
   
149
          break;

    
   
150
        case Types.TIMESTAMP:

    
   
151
          objLB = new Timestamp(lowerBound);

    
   
152
          objUB = new Timestamp(upperBound);

    
   
153
          break;

    
   
154
      }

    
   
155

   
97
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
156
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
98
      partition.setConditions(partitionColumnName + "IS NULL");
157
      partition.setConditions(

    
   
158
          constructDateConditions(objLB, objUB, false));

    
   
159
      partitions.add(partition);

    
   
160
    }

    
   
161
    switch(partitionColumnType) {

    
   
162
      case Types.DATE:

    
   
163
        objLB = new Date(upperBound);

    
   
164
        objUB = new Date(maxDateValue);

    
   
165
        break;

    
   
166
      case Types.TIME:

    
   
167
        objLB = new Time(upperBound);

    
   
168
        objUB = new Time(maxDateValue);

    
   
169
        break;

    
   
170
      case Types.TIMESTAMP:

    
   
171
        objLB = new Timestamp(upperBound);

    
   
172
        objUB = new Timestamp(maxDateValue);

    
   
173
        break;

    
   
174
    }

    
   
175
    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();

    
   
176
    partition.setConditions(

    
   
177
        constructDateConditions(objLB, objUB, true));
99
      partitions.add(partition);
178
    partitions.add(partition);
100
      return partitions;
179
    return partitions;
101
    }
180
  }
102

    
   
181

   
103
    long minValue = Long.parseLong(partitionMinValue);
182
  protected List<Partition> partitionIntegerColumn() {

    
   
183
    List<Partition> partitions = new LinkedList<Partition>();

    
   
184

   

    
   
185
    long minValue = partitionMinValue == null ? Long.MIN_VALUE

    
   
186
      : Long.parseLong(partitionMinValue);
104
    long maxValue = Long.parseLong(partitionMaxValue);
187
    long maxValue = Long.parseLong(partitionMaxValue);
105

    
   
188

   
106
    long interval =  (maxValue - minValue) / numberPartitions;
189
    long interval =  (maxValue - minValue) / numberPartitions;
107
    long remainder = (maxValue - minValue) % numberPartitions;
190
    long remainder = (maxValue - minValue) % numberPartitions;
108

    
   
191

   
[+20] [20] 23 lines
[+20] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
132
  }
215
  }
133

    
   
216

   
134
  protected List<Partition> partitionFloatingPointColumn() {
217
  protected List<Partition> partitionFloatingPointColumn() {
135
    List<Partition> partitions = new LinkedList<Partition>();
218
    List<Partition> partitions = new LinkedList<Partition>();
136

    
   
219

   
137
    if (partitionMinValue == null && partitionMaxValue == null) {

   
138
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();

   
139
      partition.setConditions(partitionColumnName + "IS NULL");

   
140
      partitions.add(partition);

   
141
      return partitions;

   
142
    }

   
143

    
   
220

   
144
    double minValue = Double.parseDouble(partitionMinValue);
221
    double minValue = partitionMinValue == null ? Double.MIN_VALUE

    
   
222
      : Double.parseDouble(partitionMinValue);
145
    double maxValue = Double.parseDouble(partitionMaxValue);
223
    double maxValue = Double.parseDouble(partitionMaxValue);
146

    
   
224

   
147
    double interval =  (maxValue - minValue) / numberPartitions;
225
    double interval =  (maxValue - minValue) / numberPartitions;
148

    
   
226

   
149
    double lowerBound;
227
    double lowerBound;
[+20] [20] 16 lines
[+20] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
166
    return partitions;
244
    return partitions;
167
  }
245
  }
168

    
   
246

   
169
  protected List<Partition> partitionNumericColumn() {
247
  protected List<Partition> partitionNumericColumn() {
170
    List<Partition> partitions = new LinkedList<Partition>();
248
    List<Partition> partitions = new LinkedList<Partition>();
171

    
   

   
172
    // All null values will result in single partition

   
173
    if (partitionMinValue == null && partitionMaxValue == null) {

   
174
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();

   
175
      partition.setConditions(partitionColumnName + "IS NULL");

   
176
      partitions.add(partition);

   
177
      return partitions;

   
178
    }

   
179

    
   

   
180
    // Having one end in null is not supported
249
    // Having one end in null is not supported
181
    if (partitionMinValue == null || partitionMaxValue == null) {
250
    if (partitionMinValue == null || partitionMaxValue == null) {
182
      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
251
      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
183
    }
252
    }
184

    
   
253

   
185
    BigDecimal minValue = new BigDecimal(partitionMinValue);
254
    BigDecimal minValue = new BigDecimal(partitionMinValue);
186
    BigDecimal maxValue = new BigDecimal(partitionMaxValue);
255
    BigDecimal maxValue = new BigDecimal(partitionMaxValue);
187

    
   
256

   
188
    // Having one single value means that we can create only one single split
257
    // Having one single value means that we can create only one single split
189
    if(minValue.equals(maxValue)) {
258
    if(minValue.equals(maxValue)) {
190
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
259
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
191
      partition.setConditions(constructConditions(minValue));
260
      partition.setConditions(constructConditions(minValue));
192
      partitions.add(partition);
261
      partitions.add(partition);

    
   
262
      return partitions;
193
    }
263
    }
194

    
   
264

   
195
    // Get all the split points together.
265
    // Get all the split points together.
196
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
266
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
197

    
   
267

   
[+20] [20] 27 lines
[+20] public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
225
    }
295
    }
226

    
   
296

   
227
    return partitions;
297
    return partitions;
228
  }
298
  }
229

    
   
299

   

    
   
300
  protected  List<Partition> partitionBooleanColumn() {

    
   
301
    List<Partition> partitions = new LinkedList<Partition>();

    
   
302

   

    
   
303

   

    
   
304
    Boolean minValue = parseBooleanValue(partitionMinValue);

    
   
305
    Boolean maxValue = parseBooleanValue(partitionMaxValue);

    
   
306

   

    
   
307
    StringBuilder conditions = new StringBuilder();

    
   
308

   

    
   
309
    // Having one single value means that we can create only one single split

    
   
310
    if(minValue.equals(maxValue)) {

    
   
311
      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();

    
   
312

   

    
   
313
      conditions.append(partitionColumnName).append(" = ")

    
   
314
          .append(maxValue);

    
   
315
      partition.setConditions(conditions.toString());
Moved from 140

    
   
316
      partitions.add(partition);
Moved from 141

    
   
317
      return partitions;
Moved from 142

    
   
318
    }

    
   
319

   

    
   
320
    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();

    
   
321

   

    
   
322
    if (partitionMinValue == null) {

    
   
323
      conditions = new StringBuilder();

    
   
324
      conditions.append(partitionColumnName).append(" IS NULL");

    
   
325
      partition.setConditions(conditions.toString());

    
   
326
      partitions.add(partition);

    
   
327
    }

    
   
328
    partition = new GenericJdbcImportPartition();

    
   
329
    conditions = new StringBuilder();

    
   
330
    conditions.append(partitionColumnName).append(" = TRUE");

    
   
331
    partition.setConditions(conditions.toString());

    
   
332
    partitions.add(partition);

    
   
333
    partition = new GenericJdbcImportPartition();

    
   
334
    conditions = new StringBuilder();

    
   
335
    conditions.append(partitionColumnName).append(" = FALSE");

    
   
336
    partition.setConditions(conditions.toString());
Moved from 140

    
   
337
    partitions.add(partition);
Moved from 141

    
   
338
    return partitions;
Moved from 142

    
   
339
  }

    
   
340

   

    
   
341
  private Boolean parseBooleanValue(String value) {

    
   
342
    if (value == null) {

    
   
343
      return null;

    
   
344
    }

    
   
345
    if (value.equals("1")) {

    
   
346
      return Boolean.TRUE;

    
   
347
    } else if (value.equals("0")) {

    
   
348
      return Boolean.FALSE;

    
   
349
    } else {

    
   
350
      return Boolean.parseBoolean(value);

    
   
351
    }

    
   
352
  }

    
   
353

   
230
  protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
354
  protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
231
    try {
355
    try {
232
      return numerator.divide(denominator);
356
      return numerator.divide(denominator);
233
    } catch (ArithmeticException ae) {
357
    } catch (ArithmeticException ae) {
234
      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
358
      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
[+20] [20] 19 lines
[+20] [+] protected String constructConditions(Object value) {
254
      .append(" = ")
378
      .append(" = ")
255
      .append(value)
379
      .append(value)
256
      .toString()
380
      .toString()
257
     ;
381
     ;
258
  }
382
  }

    
   
383

   

    
   
384
  protected String constructDateConditions(

    
   
385
      Object lowerBound, Object upperBound, boolean lastOne) {

    
   
386
    StringBuilder conditions = new StringBuilder();

    
   
387
    conditions.append('\'').append(lowerBound.toString()).append('\'');

    
   
388
    conditions.append(" <= ");

    
   
389
    conditions.append(partitionColumnName);

    
   
390
    conditions.append(" AND ");

    
   
391
    conditions.append(partitionColumnName);

    
   
392
    conditions.append(lastOne ? " <= " : " < ");

    
   
393
    conditions.append('\'').append(upperBound.toString()).append('\'');

    
   
394
    return conditions.toString();

    
   
395
  }

    
   
396

   

    
   
397
  protected String constructDateConditions(Object value) {

    
   
398
    return new StringBuilder()

    
   
399
        .append(partitionColumnName)

    
   
400
        .append(" = ")

    
   
401
        .append('\'').append(value.toString()).append('\'')

    
   
402
        .toString();

    
   
403
  }
259
}
404
}
connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
Revision ee314d0 New Change
 
  1. connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java: Loading...
  2. connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java: Loading...