Review Board 1.7.22


FLUME-1906. Optionally disable WAL in HBase Sink

Review Request #9457 - Created Feb. 14, 2013 and submitted

Hari Shreedharan
FLUME-1906
Reviewers
Flume
flume-git
Added support for WAL disabling optionally.

 
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
Revision 0b6f885 New Change
[20] 110 lines
[+20] [+] public class AsyncHBaseSink extends AbstractSink implements Configurable {
111
  private long timeout;
111
  private long timeout;
112
  private String zkQuorum;
112
  private String zkQuorum;
113
  private String zkBaseDir;
113
  private String zkBaseDir;
114
  private ExecutorService sinkCallbackPool;
114
  private ExecutorService sinkCallbackPool;
115
  private boolean isTest;
115
  private boolean isTest;

    
   
116
  private boolean enableWal = true;
116

    
   
117

   
117
  public AsyncHBaseSink(){
118
  public AsyncHBaseSink(){
118
    this(null);
119
    this(null);
119
  }
120
  }
120

    
   
121

   
[+20] [20] 63 lines
[+20] [+] public Status process() throws EventDeliveryException {
184
          List<PutRequest> actions = serializer.getActions();
185
          List<PutRequest> actions = serializer.getActions();
185
          List<AtomicIncrementRequest> increments = serializer.getIncrements();
186
          List<AtomicIncrementRequest> increments = serializer.getIncrements();
186
          callbacksExpected.addAndGet(actions.size() + increments.size());
187
          callbacksExpected.addAndGet(actions.size() + increments.size());
187

    
   
188

   
188
          for (PutRequest action : actions) {
189
          for (PutRequest action : actions) {

    
   
190
            action.setDurable(enableWal);
189
            client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
191
            client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
190
          }
192
          }
191
          for (AtomicIncrementRequest increment : increments) {
193
          for (AtomicIncrementRequest increment : increments) {
192
            client.atomicIncrement(increment).addCallbacks(
194
            client.atomicIncrement(increment).addCallbacks(
193
                    incrementSuccessCallback, incrementFailureCallback);
195
                    incrementSuccessCallback, incrementFailureCallback);
[+20] [20] 126 lines
[+20] [+] public void configure(Context context) {
320
      zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
322
      zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
321
        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
323
        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
322
    }
324
    }
323
    Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
325
    Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
324
        "The Zookeeper quorum cannot be null and should be specified.");
326
        "The Zookeeper quorum cannot be null and should be specified.");

    
   
327

   

    
   
328
    enableWal = context.getBoolean(HBaseSinkConfigurationConstants

    
   
329
      .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);

    
   
330
    logger.info("The write to WAL option is set to: " + String.valueOf(enableWal));

    
   
331
    if(!enableWal) {

    
   
332
      logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " +

    
   
333
        "All writes to HBase will have WAL disabled, and any data in the " +

    
   
334
        "memstore of this region in the Region Server could be lost!");

    
   
335
    }
325
  }
336
  }
326

    
   
337

   
327
  @VisibleForTesting
338
  @VisibleForTesting
328
  boolean isConfNull() {
339
  boolean isConfNull() {
329
    return conf == null;
340
    return conf == null;
[+20] [20] 187 lines
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
Revision 835a69e New Change
 
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
Revision fb6bd4e New Change
 
  1. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java: Loading...
  2. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java: Loading...
  3. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java: Loading...