Review Board 1.7.22


FLUME-1950. AsyncHBaseSink reconfiguration can fail if serializer.cleanUp() throws or if hbase client is not shutdown correctly

Review Request #9921 - Created March 14, 2013 and updated

Hari Shreedharan
FLUME-1950
Reviewers
Flume
flume-git
Retry close of hbase client if it fails. Also enclose serializer clean up in try-catch
All current tests pass.
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
Revision 7020fcd New Change
[20] 390 lines
[+20] [+] public Object call(Object arg) throws Exception {
391
    super.start();
391
    super.start();
392
  }
392
  }
393

    
   
393

   
394
  @Override
394
  @Override
395
  public void stop(){
395
  public void stop(){
396
    serializer.cleanUp();
396
    final AtomicBoolean notClosed = new AtomicBoolean(true);
397
    client.shutdown();
397
    int tries = 0;

    
   
398
    while (notClosed.get() && ++tries <= 3) {

    
   
399
      final CountDownLatch countDownLatch = new CountDownLatch(1);

    
   
400
      client.shutdown().addCallbacks(

    
   
401
        new Callback<Object, Object>() {

    
   
402
          @Override

    
   
403
          public Object call(Object arg) throws Exception {

    
   
404
            notClosed.set(false);

    
   
405
            countDownLatch.countDown();

    
   
406
            return null;

    
   
407
          }

    
   
408
        }, new Callback<Object, Object>() {

    
   
409
          @Override

    
   
410
          public Object call(Object arg) throws Exception {

    
   
411
            countDownLatch.countDown();

    
   
412
            return null;

    
   
413
          }

    
   
414
        }

    
   
415
      );

    
   
416
      try {

    
   
417
        countDownLatch.await();

    
   
418
      } catch (InterruptedException e) {

    
   
419
        logger.error("Interrupted while HBase Client was being shutdown", e);

    
   
420
        break;

    
   
421
      }

    
   
422
    }
398
    sinkCounter.incrementConnectionClosedCount();
423
    sinkCounter.incrementConnectionClosedCount();
399
    sinkCounter.stop();
424
    sinkCounter.stop();
400
    sinkCallbackPool.shutdown();
425
    sinkCallbackPool.shutdown();
401
    try {
426
    try {
402
      if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
427
      if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
[+20] [20] 6 lines
[+20] public Object call(Object arg) throws Exception {
409
    }
434
    }
410
    sinkCallbackPool = null;
435
    sinkCallbackPool = null;
411
    client = null;
436
    client = null;
412
    conf = null;
437
    conf = null;
413
    open = false;
438
    open = false;

    
   
439
    try {

    
   
440
      serializer.cleanUp();

    
   
441
    } catch (Exception ex) {

    
   
442
      logger.error("Error while shutting down the Hbase Sink Serializer.", ex);

    
   
443
    }
414
    super.stop();
444
    super.stop();
415
  }
445
  }
416

    
   
446

   
417
  private void handleTransactionFailure(Transaction txn)
447
  private void handleTransactionFailure(Transaction txn)
418
      throws EventDeliveryException {
448
      throws EventDeliveryException {
[+20] [20] 109 lines
  1. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java: Loading...