Review Board 1.7.22


AsyncHbase Sink bugfix plus tests errors on Windows

Review Request #12889 - Created July 24, 2013 and updated

Roshan Naik
flume-1.5
FLUME-2134
Reviewers
Flume
flume-git
1) The Async HBase sink uses conf.get("hbase.zookeeper.quorum") to get the zookeeper quorum info... which on windows is only returning "localhost". HBase folks advised the use of  ZKConfig.getZKQuorumServersString(conf) instead. I confirmed it returns the right value  "localhost:port#" on both Windows and Linux.

2) Setup code in Async HBase Sink tests are simplified to use testUtility provided by HBase to spin up a HBase cluster. The Temporary directory creation in the old code caused problems when used by HBase, if the directory name had a space in it. 
Tests pass both on Linux and Windows.
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
Revision 7020fcd New Change
[20] 34 lines
[+20]
35
import org.apache.flume.conf.Configurable;
35
import org.apache.flume.conf.Configurable;
36
import org.apache.flume.sink.AbstractSink;
36
import org.apache.flume.sink.AbstractSink;
37
import org.apache.hadoop.conf.Configuration;
37
import org.apache.hadoop.conf.Configuration;
38
import org.apache.hadoop.hbase.HBaseConfiguration;
38
import org.apache.hadoop.hbase.HBaseConfiguration;
39
import org.apache.hadoop.hbase.HConstants;
39
import org.apache.hadoop.hbase.HConstants;

    
   
40
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
40
import org.hbase.async.AtomicIncrementRequest;
41
import org.hbase.async.AtomicIncrementRequest;
41
import org.hbase.async.HBaseClient;
42
import org.hbase.async.HBaseClient;
42
import org.hbase.async.PutRequest;
43
import org.hbase.async.PutRequest;
43
import org.slf4j.Logger;
44
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45
import org.slf4j.LoggerFactory;
[+20] [20] 271 lines
[+20] [+] public void configure(Context context) {
316
          HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT);
317
          HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT);
317
    } else {
318
    } else {
318
      if (conf == null) { //In tests, we pass the conf in.
319
      if (conf == null) { //In tests, we pass the conf in.
319
        conf = HBaseConfiguration.create();
320
        conf = HBaseConfiguration.create();
320
      }
321
      }
321
      zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
322
      zkQuorum = ZKConfig.getZKQuorumServersString(conf);
322
      zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
323
      zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
323
        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
324
        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
324
    }
325
    }
325
    Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
326
    Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
326
        "The Zookeeper quorum cannot be null and should be specified.");
327
        "The Zookeeper quorum cannot be null and should be specified.");
[+20] [20] 201 lines
flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
Revision 7ddfdae New Change
 
  1. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java: Loading...
  2. flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java: Loading...