Review Board 1.7.22


HBase-5741 ImportTsv does not check for table existence

Review Request #4700 - Created April 11, 2012 and updated

Himanshu Vashishtha
trunk
HBase-5741
Reviewers
hbase
hbase-git
There is a bulk output option in the importtsv workload. It outputs the HFiles in a user defined directory. The current code assumes that a table with its name equal to the given output directory exists, and throws an exception otherwise. Here is a patch for creating a table in case it doesn't exist.
Added a new test for bulkoutput; All importtsv tests pass.

Diff revision 2 (Latest)

1 2
1 2

  1. src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java: Loading...
  2. src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java: Loading...
src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
Revision ab22fc4 New Change
[20] 20 lines
[+20]
21

    
   
21

   
22
import org.apache.hadoop.hbase.util.Base64;
22
import org.apache.hadoop.hbase.util.Base64;
23

    
   
23

   
24
import java.io.IOException;
24
import java.io.IOException;
25
import java.util.ArrayList;
25
import java.util.ArrayList;

    
   
26
import java.util.HashSet;

    
   
27
import java.util.Set;
26

    
   
28

   
27
import org.apache.hadoop.classification.InterfaceAudience;
29
import org.apache.hadoop.classification.InterfaceAudience;
28
import org.apache.hadoop.classification.InterfaceStability;
30
import org.apache.hadoop.classification.InterfaceStability;
29
import org.apache.hadoop.conf.Configuration;
31
import org.apache.hadoop.conf.Configuration;
30
import org.apache.hadoop.fs.Path;
32
import org.apache.hadoop.fs.Path;
31
import org.apache.hadoop.hbase.HBaseConfiguration;
33
import org.apache.hadoop.hbase.HBaseConfiguration;

    
   
34
import org.apache.hadoop.hbase.HColumnDescriptor;
32
import org.apache.hadoop.hbase.HConstants;
35
import org.apache.hadoop.hbase.HConstants;

    
   
36
import org.apache.hadoop.hbase.HTableDescriptor;

    
   
37
import org.apache.hadoop.hbase.client.HBaseAdmin;
33
import org.apache.hadoop.hbase.client.HTable;
38
import org.apache.hadoop.hbase.client.HTable;
34
import org.apache.hadoop.hbase.client.Put;
39
import org.apache.hadoop.hbase.client.Put;
35
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36
import org.apache.hadoop.hbase.util.Bytes;
41
import org.apache.hadoop.hbase.util.Bytes;
37
import org.apache.hadoop.mapreduce.Job;
42
import org.apache.hadoop.mapreduce.Job;
[+20] [20] 25 lines
[+20] [+] public class ImportTsv {
63
  final static String COLUMNS_CONF_KEY = "importtsv.columns";
68
  final static String COLUMNS_CONF_KEY = "importtsv.columns";
64
  final static String SEPARATOR_CONF_KEY = "importtsv.separator";
69
  final static String SEPARATOR_CONF_KEY = "importtsv.separator";
65
  final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
70
  final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
66
  final static String DEFAULT_SEPARATOR = "\t";
71
  final static String DEFAULT_SEPARATOR = "\t";
67
  final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
72
  final static Class DEFAULT_MAPPER = TsvImporterMapper.class;

    
   
73
  private static HBaseAdmin hbaseAdmin;
68

    
   
74

   
69
  static class TsvParser {
75
  static class TsvParser {
70
    /**
76
    /**
71
     * Column families and qualifiers mapped to the TSV columns
77
     * Column families and qualifiers mapped to the TSV columns
72
     */
78
     */
[+20] [20] 146 lines
[+20] [+] public static Job createSubmittableJob(Configuration conf, String[] args)
219
    job.setInputFormatClass(TextInputFormat.class);
225
    job.setInputFormatClass(TextInputFormat.class);
220
    job.setMapperClass(mapperClass);
226
    job.setMapperClass(mapperClass);
221

    
   
227

   
222
    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
228
    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
223
    if (hfileOutPath != null) {
229
    if (hfileOutPath != null) {

    
   
230
      if (!doesTableExist(tableName)) {

    
   
231
        createTable(conf, tableName);

    
   
232
      }
224
      HTable table = new HTable(conf, tableName);
233
      HTable table = new HTable(conf, tableName);
225
      job.setReducerClass(PutSortReducer.class);
234
      job.setReducerClass(PutSortReducer.class);
226
      Path outputDir = new Path(hfileOutPath);
235
      Path outputDir = new Path(hfileOutPath);
227
      FileOutputFormat.setOutputPath(job, outputDir);
236
      FileOutputFormat.setOutputPath(job, outputDir);
228
      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
237
      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
[+20] [20] 10 lines
[+20] public static Job createSubmittableJob(Configuration conf, String[] args)
239
    TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
248
    TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
240
        com.google.common.base.Function.class /* Guava used by TsvParser */);
249
        com.google.common.base.Function.class /* Guava used by TsvParser */);
241
    return job;
250
    return job;
242
  }
251
  }
243

    
   
252

   

    
   
253
  private static boolean doesTableExist(String tableName) throws IOException {

    
   
254
    return (hbaseAdmin.tableExists(tableName.getBytes()));

    
   
255
  }

    
   
256

   

    
   
257
  private static void createTable(Configuration conf, String tableName)

    
   
258
      throws IOException {

    
   
259
    HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());

    
   
260
    String columns[] = conf.getStrings(COLUMNS_CONF_KEY);

    
   
261
    Set<String> cfSet = new HashSet<String>();

    
   
262
    for (String aColumn : columns) {

    
   
263
      if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn))

    
   
264
        continue;

    
   
265
      // we are just bothered about the first one (in case this is a cf:cq)

    
   
266
      cfSet.add(aColumn.split(":", 2)[0]);

    
   
267
    }

    
   
268
    for (String cf : cfSet) {

    
   
269
      HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));

    
   
270
      htd.addFamily(hcd);

    
   
271
    }

    
   
272
    hbaseAdmin.createTable(htd);

    
   
273
  }

    
   
274

   
244
  /*
275
  /*
245
   * @param errorMsg Error message.  Can be null.
276
   * @param errorMsg Error message.  Can be null.
246
   */
277
   */
247
  private static void usage(final String errorMsg) {
278
  private static void usage(final String errorMsg) {
248
    if (errorMsg != null && errorMsg.length() > 0) {
279
    if (errorMsg != null && errorMsg.length() > 0) {
[+20] [20] 28 lines
[+20] private static void usage(final String errorMsg) {
277

    
   
308

   
278
    System.err.println(usage);
309
    System.err.println(usage);
279
  }
310
  }
280

    
   
311

   
281
  /**
312
  /**

    
   
313
   * Used only by test method

    
   
314
   * @param conf

    
   
315
   */

    
   
316
  static void createHbaseAdmin(Configuration conf) throws IOException {

    
   
317
    hbaseAdmin = new HBaseAdmin(conf);

    
   
318
  }

    
   
319

   

    
   
320
  /**
282
   * Main entry point.
321
   * Main entry point.
283
   *
322
   *
284
   * @param args  The command line parameters.
323
   * @param args  The command line parameters.
285
   * @throws Exception When running the job fails.
324
   * @throws Exception When running the job fails.
286
   */
325
   */
[+20] [20] 26 lines
[+20] [+] public static void main(String[] args) throws Exception {
313
    // Make sure one or more columns are specified
352
    // Make sure one or more columns are specified
314
    if (columns.length < 2) {
353
    if (columns.length < 2) {
315
      usage("One or more columns in addition to the row key are required");
354
      usage("One or more columns in addition to the row key are required");
316
      System.exit(-1);
355
      System.exit(-1);
317
    }
356
    }
318

    
   
357
    hbaseAdmin = new HBaseAdmin(conf);
319
    Job job = createSubmittableJob(conf, otherArgs);
358
    Job job = createSubmittableJob(conf, otherArgs);
320
    System.exit(job.waitForCompletion(true) ? 0 : 1);
359
    System.exit(job.waitForCompletion(true) ? 0 : 1);
321
  }
360
  }
322

    
   
361

   
323
}
362
}
src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Revision ac30a62 New Change
 
  1. src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java: Loading...
  2. src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java: Loading...