Review Board 1.7.22


FLUME-2202. AsyncHBaseSink should coalesce increments to reduce RPC roundtrips

Review Request #14454 - Created Oct. 2, 2013 and updated

Hari Shreedharan
FLUME-2202
Reviewers
Flume
flume-git
Added a new config to coalesce increments. 
All current tests pass. Added 2 new tests
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
Revision 5e297b1 New Change
[20] 15 lines
[+20]
16
 * specific language governing permissions and limitations
16
 * specific language governing permissions and limitations
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.sink.hbase;
19
package org.apache.flume.sink.hbase;
20

    
   
20

   

    
   
21
import java.util.Arrays;

    
   
22
import java.util.Collection;

    
   
23
import java.util.Comparator;
21
import java.util.List;
24
import java.util.List;

    
   
25
import java.util.Map;

    
   
26
import java.util.Set;
22
import java.util.concurrent.CountDownLatch;
27
import java.util.concurrent.CountDownLatch;
23
import java.util.concurrent.ExecutorService;
28
import java.util.concurrent.ExecutorService;
24
import java.util.concurrent.Executors;
29
import java.util.concurrent.Executors;
25
import java.util.concurrent.atomic.AtomicBoolean;
30
import java.util.concurrent.atomic.AtomicBoolean;
26

    
   
31

   
27
import com.google.common.annotations.VisibleForTesting;
32
import com.google.common.annotations.VisibleForTesting;

    
   
33
import com.google.common.collect.Maps;

    
   
34
import com.google.common.primitives.UnsignedBytes;
28
import com.google.common.util.concurrent.ThreadFactoryBuilder;
35
import com.google.common.util.concurrent.ThreadFactoryBuilder;
29
import org.apache.flume.Channel;
36
import org.apache.flume.Channel;
30
import org.apache.flume.Context;
37
import org.apache.flume.Context;
31
import org.apache.flume.Event;
38
import org.apache.flume.Event;
32
import org.apache.flume.EventDeliveryException;
39
import org.apache.flume.EventDeliveryException;
[+20] [20] 78 lines
[+20] [+] public class AsyncHBaseSink extends AbstractSink implements Configurable {
111
  private SinkCounter sinkCounter;
118
  private SinkCounter sinkCounter;
112
  private long timeout;
119
  private long timeout;
113
  private String zkQuorum;
120
  private String zkQuorum;
114
  private String zkBaseDir;
121
  private String zkBaseDir;
115
  private ExecutorService sinkCallbackPool;
122
  private ExecutorService sinkCallbackPool;
116
  private boolean isTest;
123
  private boolean isTimeoutTest;

    
   
124
  private boolean isCoalesceTest;
117
  private boolean enableWal = true;
125
  private boolean enableWal = true;

    
   
126
  private boolean batchIncrements = false;

    
   
127
  private volatile int totalCallbacksReceived = 0;

    
   
128
  private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;

    
   
129

   

    
   
130
  // Does not need to be thread-safe. Always called only from the sink's

    
   
131
  // process method.

    
   
132
  private final Comparator<byte[]> COMPARATOR = UnsignedBytes

    
   
133
    .lexicographicalComparator();
118

    
   
134

   
119
  public AsyncHBaseSink(){
135
  public AsyncHBaseSink(){
120
    this(null);
136
    this(null);
121
  }
137
  }
122

    
   
138

   
123
  public AsyncHBaseSink(Configuration conf) {
139
  public AsyncHBaseSink(Configuration conf) {
124
    this(conf, false);
140
    this(conf, false, false);
125
  }
141
  }
126

    
   
142

   
127
  AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting) {
143
  AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting,

    
   
144
    boolean isCoalesceTest) {
128
    this.conf = conf;
145
    this.conf = conf;
129
    isTest = isTimeoutTesting;
146
    isTimeoutTest = isTimeoutTesting;

    
   
147
    this.isCoalesceTest = isCoalesceTest;
130
  }
148
  }
131

    
   
149

   
132
  @Override
150
  @Override
133
  public Status process() throws EventDeliveryException {
151
  public Status process() throws EventDeliveryException {
134
    /*
152
    /*
[+20] [20] 10 lines
[+20] public Status process() throws EventDeliveryException {
145
    AtomicBoolean txnFail = new AtomicBoolean(false);
163
    AtomicBoolean txnFail = new AtomicBoolean(false);
146
    AtomicInteger callbacksReceived = new AtomicInteger(0);
164
    AtomicInteger callbacksReceived = new AtomicInteger(0);
147
    AtomicInteger callbacksExpected = new AtomicInteger(0);
165
    AtomicInteger callbacksExpected = new AtomicInteger(0);
148
    final Lock lock = new ReentrantLock();
166
    final Lock lock = new ReentrantLock();
149
    final Condition condition = lock.newCondition();
167
    final Condition condition = lock.newCondition();

    
   
168
    if (incrementBuffer != null) {

    
   
169
      incrementBuffer.clear();

    
   
170
    }
150
    /*
171
    /*
151
     * Callbacks can be reused per transaction, since they share the same
172
     * Callbacks can be reused per transaction, since they share the same
152
     * locks and conditions.
173
     * locks and conditions.
153
     */
174
     */
154
    Callback<Object, Object> putSuccessCallback =
175
    Callback<Object, Object> putSuccessCallback =
[+20] [20] 28 lines
[+20] public Status process() throws EventDeliveryException {
183
          break;
204
          break;
184
        } else {
205
        } else {
185
          serializer.setEvent(event);
206
          serializer.setEvent(event);
186
          List<PutRequest> actions = serializer.getActions();
207
          List<PutRequest> actions = serializer.getActions();
187
          List<AtomicIncrementRequest> increments = serializer.getIncrements();
208
          List<AtomicIncrementRequest> increments = serializer.getIncrements();
188
          callbacksExpected.addAndGet(actions.size() + increments.size());
209
          callbacksExpected.addAndGet(actions.size());

    
   
210
          if(!batchIncrements) {

    
   
211
            callbacksExpected.addAndGet(increments.size());

    
   
212
          }
189

    
   
213

   
190
          for (PutRequest action : actions) {
214
          for (PutRequest action : actions) {
191
            action.setDurable(enableWal);
215
            action.setDurable(enableWal);
192
            client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
216
            client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
193
          }
217
          }
194
          for (AtomicIncrementRequest increment : increments) {
218
          for (AtomicIncrementRequest increment : increments) {

    
   
219
            if(batchIncrements) {

    
   
220
              CellIdentifier identifier = new CellIdentifier(increment.key(),

    
   
221
                increment.qualifier());

    
   
222
              if (!incrementBuffer.containsKey(identifier)) {

    
   
223
                incrementBuffer.put(identifier, increment);

    
   
224
              } else {

    
   
225
                AtomicIncrementRequest request

    
   
226
                  = incrementBuffer.get(identifier);

    
   
227
                request.setAmount(request.getAmount() + increment.getAmount());

    
   
228
              }

    
   
229
            } else {
195
            client.atomicIncrement(increment).addCallbacks(
230
              client.atomicIncrement(increment).addCallbacks(
196
                    incrementSuccessCallback, incrementFailureCallback);
231
                incrementSuccessCallback, incrementFailureCallback);
197
          }
232
            }
198
        }
233
          }
199
      }
234
        }

    
   
235
      }

    
   
236
      if(batchIncrements) {

    
   
237
        Collection<AtomicIncrementRequest> increments = incrementBuffer.values();

    
   
238
        for(AtomicIncrementRequest increment : increments) {

    
   
239
          client.atomicIncrement(increment).addCallbacks(

    
   
240
            incrementSuccessCallback, incrementFailureCallback);

    
   
241
        }

    
   
242
        callbacksExpected.addAndGet(increments.size());

    
   
243
      }
200
      client.flush();
244
      client.flush();
201
    } catch (Throwable e) {
245
    } catch (Throwable e) {
202
      this.handleTransactionFailure(txn);
246
      this.handleTransactionFailure(txn);
203
      this.checkIfChannelExceptionAndThrow(e);
247
      this.checkIfChannelExceptionAndThrow(e);
204
    }
248
    }
[+20] [20] 24 lines
[+20] public Status process() throws EventDeliveryException {
229
      }
273
      }
230
    } finally {
274
    } finally {
231
      lock.unlock();
275
      lock.unlock();
232
    }
276
    }
233

    
   
277

   

    
   
278
    if(isCoalesceTest) {

    
   
279
      totalCallbacksReceived += callbacksReceived.get();

    
   
280
    }

    
   
281

   
234
    /*
282
    /*
235
     * At this point, either the txn has failed
283
     * At this point, either the txn has failed
236
     * or all callbacks received and txn is successful.
284
     * or all callbacks received and txn is successful.
237
     *
285
     *
238
     * This need not be in the monitor, since all callbacks for this txn
286
     * This need not be in the monitor, since all callbacks for this txn
[+20] [20] 93 lines
[+20] [+] public void configure(Context context) {
332
    if(!enableWal) {
380
    if(!enableWal) {
333
      logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " +
381
      logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " +
334
        "All writes to HBase will have WAL disabled, and any data in the " +
382
        "All writes to HBase will have WAL disabled, and any data in the " +
335
        "memstore of this region in the Region Server could be lost!");
383
        "memstore of this region in the Region Server could be lost!");
336
    }
384
    }

    
   
385

   

    
   
386
    batchIncrements = context.getBoolean(

    
   
387
      HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,

    
   
388
      HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);

    
   
389

   

    
   
390
    if(batchIncrements) {

    
   
391
      incrementBuffer = Maps.newHashMap();

    
   
392
      logger.info("Increment coalescing is enabled. Increments will be " +

    
   
393
        "buffered.");

    
   
394
    }

    
   
395
  }

    
   
396

   

    
   
397
  @VisibleForTesting

    
   
398
  int getTotalCallbacksReceived() {

    
   
399
    return totalCallbacksReceived;
337
  }
400
  }
338

    
   
401

   
339
  @VisibleForTesting
402
  @VisibleForTesting
340
  boolean isConfNull() {
403
  boolean isConfNull() {
341
    return conf == null;
404
    return conf == null;
342
  }
405
  }
343
  @Override
406
  @Override
344
  public void start(){
407
  public void start(){
345
    Preconditions.checkArgument(client == null, "Please call stop "
408
    Preconditions.checkArgument(client == null, "Please call stop "
346
            + "before calling start on an old instance.");
409
            + "before calling start on an old instance.");
347
    sinkCounter.start();
410
    sinkCounter.start();
348
    sinkCounter.incrementConnectionCreatedCount();
411
    sinkCounter.incrementConnectionCreatedCount();
349
    if (!isTest) {
412
    if (!isTimeoutTest) {
350
      sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
413
      sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
351
        .setNameFormat(this.getName() + " HBase Call Pool").build());
414
        .setNameFormat(this.getName() + " HBase Call Pool").build());
352
    } else {
415
    } else {
353
      sinkCallbackPool = Executors.newSingleThreadExecutor();
416
      sinkCallbackPool = Executors.newSingleThreadExecutor();
354
    }
417
    }
[+20] [20] 90 lines
[+20] [+] private class SuccessCallback<R,T> implements Callback<R,T> {
445
    public SuccessCallback(Lock lck, AtomicInteger callbacksReceived,
508
    public SuccessCallback(Lock lck, AtomicInteger callbacksReceived,
446
            Condition condition) {
509
            Condition condition) {
447
      lock = lck;
510
      lock = lck;
448
      this.callbacksReceived = callbacksReceived;
511
      this.callbacksReceived = callbacksReceived;
449
      this.condition = condition;
512
      this.condition = condition;
450
      isTimeoutTesting = isTest;
513
      isTimeoutTesting = isTimeoutTest;
451
    }
514
    }
452

    
   
515

   
453
    @Override
516
    @Override
454
    public R call(T arg) throws Exception {
517
    public R call(T arg) throws Exception {
455
      if (isTimeoutTesting) {
518
      if (isTimeoutTesting) {
[+20] [20] 29 lines
[+20] [+] private class FailureCallback<R,T> implements Callback<R,T> {
485
            AtomicBoolean txnFail, Condition condition){
548
            AtomicBoolean txnFail, Condition condition){
486
      this.lock = lck;
549
      this.lock = lck;
487
      this.callbacksReceived = callbacksReceived;
550
      this.callbacksReceived = callbacksReceived;
488
      this.txnFail = txnFail;
551
      this.txnFail = txnFail;
489
      this.condition = condition;
552
      this.condition = condition;
490
      isTimeoutTesting = isTest;
553
      isTimeoutTesting = isTimeoutTest;
491
    }
554
    }
492

    
   
555

   
493
    @Override
556
    @Override
494
    public R call(T arg) throws Exception {
557
    public R call(T arg) throws Exception {
495
      if (isTimeoutTesting) {
558
      if (isTimeoutTesting) {
[+20] [20] 27 lines
[+20] [+] private void checkIfChannelExceptionAndThrow(Throwable e)
523
    } else if (e instanceof Error || e instanceof RuntimeException) {
586
    } else if (e instanceof Error || e instanceof RuntimeException) {
524
      Throwables.propagate(e);
587
      Throwables.propagate(e);
525
    }
588
    }
526
    throw new EventDeliveryException("Error in processing transaction.", e);
589
    throw new EventDeliveryException("Error in processing transaction.", e);
527
  }
590
  }

    
   
591

   

    
   
592
  private class CellIdentifier {

    
   
593
    private final byte[] row;

    
   
594
    private final byte[] column;

    
   
595
    private final int hashCode;

    
   
596
    // Since the sink operates only on one table and one cf,

    
   
597
    // we use the data from the owning sink

    
   
598
    public CellIdentifier(byte[] row, byte[] column) {

    
   
599
      this.row = row;

    
   
600
      this.column = column;

    
   
601
      this.hashCode =

    
   
602
        (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31);

    
   
603
    }

    
   
604

   

    
   
605
    @Override

    
   
606
    public int hashCode() {

    
   
607
      return hashCode;

    
   
608
    }

    
   
609

   

    
   
610
    // Since we know that this class is used from only this class,

    
   
611
    // skip the class comparison to save time

    
   
612
    @Override

    
   
613
    public boolean equals(Object other) {

    
   
614
      CellIdentifier o = (CellIdentifier) other;

    
   
615
      if (other == null) {

    
   
616
        return false;

    
   
617
      } else {

    
   
618
        return (COMPARATOR.compare(row, o.row) == 0

    
   
619
          && COMPARATOR.compare(column, o.column) == 0);

    
   
620
      }

    
   
621
    }

    
   
622
  }
528
}
623
}
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
Revision 7fdc75b New Change
 
flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
New File
 
flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
Revision a0c04eb New Change
 
  1. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java: Loading...
  2. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java: Loading...
  3. flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java: Loading...
  4. flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java: Loading...