Review Board 1.7.22


FLUME-1649. Improve HBaseSink performance.

Review Request #7657 - Created Oct. 18, 2012 and updated

Hari Shreedharan
FLUME-1649
Reviewers
Flume
flume-git
Modify the Hbase sink to use multiple threads to wait on RPC calls. Since the threads are blocked often, this will not cause too many running threads.
All current unit tests pass. Did functional testing.

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java: Loading...
flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
Revision 021ecd0 New Change
[20] 15 lines
[+20]
16
 * specific language governing permissions and limitations
16
 * specific language governing permissions and limitations
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.sink.hbase;
19
package org.apache.flume.sink.hbase;
20

    
   
20

   
Moved from 43

    
   
21
import com.google.common.base.Charsets;
Moved from 44

    
   
22
import com.google.common.base.Preconditions;
Moved from 45

    
   
23
import com.google.common.base.Throwables;

    
   
24
import com.google.common.collect.Lists;

    
   
25
import com.google.common.util.concurrent.ThreadFactoryBuilder;
21
import java.io.IOException;
26
import java.io.IOException;
22
import java.util.LinkedList;

   
23
import java.util.List;
27
import java.util.List;
24

    
   
28
import java.util.concurrent.ExecutionException;

    
   
29
import java.util.concurrent.ExecutorCompletionService;

    
   
30
import java.util.concurrent.ExecutorService;

    
   
31
import java.util.concurrent.Executors;

    
   
32
import java.util.concurrent.Future;

    
   
33
import java.util.concurrent.TimeUnit;

    
   
34
import java.util.concurrent.atomic.AtomicBoolean;
25
import org.apache.flume.Channel;
35
import org.apache.flume.Channel;
26
import org.apache.flume.Context;
36
import org.apache.flume.Context;
27
import org.apache.flume.CounterGroup;
37
import org.apache.flume.CounterGroup;
28
import org.apache.flume.Event;
38
import org.apache.flume.Event;
29
import org.apache.flume.EventDeliveryException;
39
import org.apache.flume.EventDeliveryException;
30
import org.apache.flume.FlumeException;
40
import org.apache.flume.FlumeException;
31
import org.apache.flume.Transaction;
41
import org.apache.flume.Transaction;
32
import org.apache.flume.conf.Configurable;
42
import org.apache.flume.conf.Configurable;

    
   
43
import org.apache.flume.instrumentation.SinkCounter;
33
import org.apache.flume.sink.AbstractSink;
44
import org.apache.flume.sink.AbstractSink;
34
import org.apache.hadoop.conf.Configuration;
45
import org.apache.hadoop.conf.Configuration;
35
import org.apache.hadoop.hbase.HBaseConfiguration;
46
import org.apache.hadoop.hbase.HBaseConfiguration;
36
import org.apache.hadoop.hbase.client.HTable;
47
import org.apache.hadoop.hbase.client.HTable;
37
import org.apache.hadoop.hbase.client.Increment;
48
import org.apache.hadoop.hbase.client.Increment;
38
import org.apache.hadoop.hbase.client.Row;
49
import org.apache.hadoop.hbase.client.Row;
39
import org.apache.hadoop.hbase.util.Bytes;
50
import org.apache.hadoop.hbase.util.Bytes;
40
import org.slf4j.Logger;
51
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
52
import org.slf4j.LoggerFactory;
42

    
   
53

   
43
import com.google.common.base.Charsets;
Moved to 21

   
44
import com.google.common.base.Preconditions;
Moved to 22

   
45
import com.google.common.base.Throwables;
Moved to 23

   
46

    
   

   
47

    
   
54

   
48
/**
55
/**
49
 *
56
 *
50
 * A simple sink which reads events from a channel and writes them to HBase.
57
 * A simple sink which reads events from a channel and writes them to HBase.
51
 * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
58
 * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
[+20] [20] 36 lines
[+20] [+] public class HBaseSink extends AbstractSink implements Configurable {
88
  private CounterGroup counterGroup = new CounterGroup();
95
  private CounterGroup counterGroup = new CounterGroup();
89
  private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
96
  private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
90
  private HbaseEventSerializer serializer;
97
  private HbaseEventSerializer serializer;
91
  private String eventSerializerType;
98
  private String eventSerializerType;
92
  private Context serializerContext;
99
  private Context serializerContext;

    
   
100
  private ExecutorService threadPool;

    
   
101
  private ExecutorCompletionService<Object> threadPoolService;

    
   
102
  private SinkCounter sinkCounter;
93

    
   
103

   
94
  public HBaseSink(){
104
  public HBaseSink(){
95
    this(HBaseConfiguration.create());
105
    this(HBaseConfiguration.create());
96
  }
106
  }
97

    
   
107

   
[+20] [20] 28 lines
[+20] [+] public void start(){
126
      throw new FlumeException("Error getting column family from HBase." +
136
      throw new FlumeException("Error getting column family from HBase." +
127
          "Please verify that the table "+ tableName +" and Column Family, "
137
          "Please verify that the table "+ tableName +" and Column Family, "
128
          + Bytes.toString(columnFamily) + " exists in HBase.", e);
138
          + Bytes.toString(columnFamily) + " exists in HBase.", e);
129
    }
139
    }
130

    
   
140

   

    
   
141
    threadPool = Executors.newCachedThreadPool(

    
   
142
            new ThreadFactoryBuilder()

    
   
143
            .setNameFormat("hbase-sink-runnable-%d").build());

    
   
144
    threadPoolService = new ExecutorCompletionService<Object>(threadPool);

    
   
145
    sinkCounter.start();

    
   
146
    sinkCounter.incrementConnectionCreatedCount();
131
    super.start();
147
    super.start();
132
  }
148
  }
133

    
   
149

   
134
  @Override
150
  @Override
135
  public void stop(){
151
  public void stop(){
136
    try {
152
    try {

    
   
153
      threadPool.shutdown();

    
   
154
      if(!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {

    
   
155
        threadPool.shutdownNow();

    
   
156
      }

    
   
157
      while(!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {

    
   
158
        logger.info("Waiting for HBaseSink to shutdown.");

    
   
159
      }
137
      table.close();
160
      table.close();
138
      table = null;
161
      table = null;

    
   
162
    } catch (InterruptedException ex) {

    
   
163
      logger.error("Interrupted while waiting for HBaseSink to stop.", ex);

    
   
164
      throw new FlumeException("Interrupted while waiting "

    
   
165
              + "for HBaseSink to stop.", ex);
139
    } catch (IOException e) {
166
    } catch (IOException e) {
140
      throw new FlumeException("Error closing table.", e);
167
      throw new FlumeException("Error closing table.", e);
141
    }
168
    }

    
   
169
    sinkCounter.stop();
142
  }
170
  }
143

    
   
171

   
144
  @SuppressWarnings("unchecked")
172
  @SuppressWarnings("unchecked")
145
  @Override
173
  @Override
146
  public void configure(Context context){
174
  public void configure(Context context){
[+20] [20] 27 lines
[+20] public void stop(){
174
      serializer.configure(serializerContext);
202
      serializer.configure(serializerContext);
175
    } catch (Exception e) {
203
    } catch (Exception e) {
176
      logger.error("Could not instantiate event serializer." , e);
204
      logger.error("Could not instantiate event serializer." , e);
177
      Throwables.propagate(e);
205
      Throwables.propagate(e);
178
    }
206
    }

    
   
207
    if (sinkCounter == null) {

    
   
208
      sinkCounter = new SinkCounter(this.getName());

    
   
209
    }
179
  }
210
  }
180

    
   
211

   
181
  @Override
212
  @Override
182
  public Status process() throws EventDeliveryException {
213
  public Status process() throws EventDeliveryException {
183
    Status status = Status.READY;
214
    Status status = Status.READY;
184
    Channel channel = getChannel();
215
    Channel channel = getChannel();
185
    Transaction txn = channel.getTransaction();
216
    Transaction txn = channel.getTransaction();
186
    List<Row> actions = new LinkedList<Row>();
217
    final AtomicBoolean txnFail = new AtomicBoolean(false);
187
    List<Increment> incs = new LinkedList<Increment>();
218
    int runnableCount = 0;

    
   
219
    List<Future<Object>> futures = Lists.newArrayList();
188
    txn.begin();
220
    txn.begin();
189
    for(long i = 0; i < batchSize; i++) {
221
    int i = 0;

    
   
222
    for(; i < batchSize; i++) {
190
      Event event = channel.take();
223
      Event event = channel.take();
191
      if(event == null){
224
      if(event == null){
192
        status = Status.BACKOFF;
225
        status = Status.BACKOFF;
193
        counterGroup.incrementAndGet("channel.underflow");
226
        if (i == 0) {

    
   
227
          sinkCounter.incrementBatchEmptyCount();

    
   
228
        } else {

    
   
229
          sinkCounter.incrementBatchUnderflowCount();

    
   
230
        }
194
        break;
231
        break;
195
      } else {
232
      } else {
196
        serializer.initialize(event, columnFamily);
233
        serializer.initialize(event, columnFamily);
197
        actions.addAll(serializer.getActions());
234
        futures.add(threadPoolService.submit(
198
        incs.addAll(serializer.getIncrements());
235
                new PutRunnable(txnFail, serializer.getActions()), null));

    
   
236
        runnableCount++;

    
   
237
        for(Increment increment : serializer.getIncrements()) {

    
   
238
          futures.add(threadPoolService.submit(

    
   
239
                  new PutRunnable(txnFail, increment), null));

    
   
240
          runnableCount++;
199
      }
241
        }
200
    }
242
      }
201
    putEventsAndCommit(actions, incs, txn);

   
202
    return status;

   
203
  }
243
    }
204

    
   
244
    int eventCount = i;
205
  private void putEventsAndCommit(List<Row> actions, List<Increment> incs,
245
    if (eventCount == batchSize) {
206
      Transaction txn) throws EventDeliveryException {
246
      sinkCounter.incrementBatchCompleteCount();

    
   
247
    }

    
   
248
    sinkCounter.addToEventDrainAttemptCount(eventCount);

    
   
249
    for(i = 0; i < runnableCount; i++) {

    
   
250
      Future<Object> future = null;
207
    try {
251
      try {
208
      table.batch(actions);
252
        future = threadPoolService.take();
209
      for(Increment i: incs){
253
      } catch (InterruptedException ex) {
210
        table.increment(i);
254
        Thread.currentThread().interrupt();

    
   
255
        logger.error("Sink: " + getName() + " interrupted while waiting for"

    
   
256
                + " Hbase to complete operations.", ex);

    
   
257
        txnFail.set(true);

    
   
258
      }

    
   
259
      if(txnFail.get()) {

    
   
260
        //The transaction has failed. Clear up all the pending tasks

    
   
261
        //and throw exception.

    
   
262
        for(Future<Object> current:futures) {

    
   
263
          if(!(current.isCancelled() ||  current.isDone())) {

    
   
264
            current.cancel(true);

    
   
265
          }

    
   
266
        }

    
   
267
        try{

    
   
268
          txn.rollback();

    
   
269
        } finally {

    
   
270
          txn.close();

    
   
271
        }

    
   
272
        try {

    
   
273
          future.get();

    
   
274
        } catch (ExecutionException ex) {

    
   
275
          throw new EventDeliveryException("Error while writing to HBase.",

    
   
276
                  ex.getCause());

    
   
277
        } catch (Exception ex) {

    
   
278
          throw new EventDeliveryException("Error while writing to HBase.", ex);
211
      }
279
        }

    
   
280
      }

    
   
281
    }

    
   
282
    try {
212
      txn.commit();
283
      txn.commit();
213
      counterGroup.incrementAndGet("transaction.success");
284
      sinkCounter.addToEventDrainSuccessCount((long)eventCount);
214
    } catch (Throwable e) {
285
    } catch (Throwable ex) {

    
   
286
      logger.warn("Transaction commit failed. "

    
   
287
              + "Transaction will be rolled back.", ex);
215
      try{
288
      try{
216
        txn.rollback();
289
        txn.rollback();
217
      } catch (Exception e2) {
290
      } catch (Throwable ex1) {
218
        logger.error("Exception in rollback. Rollback might not have been" +
291
        logger.error("Error while rolling back transaction. "
219
            "successful." , e2);
292
                + "Rollback may have failed.", ex1);
220
      }

   
221
      counterGroup.incrementAndGet("transaction.rollback");

   
222
      logger.error("Failed to commit transaction." +

   
223
          "Transaction rolled back.", e);

   
224
      if(e instanceof Error || e instanceof RuntimeException){

   
225
        logger.error("Failed to commit transaction." +

   
226
            "Transaction rolled back.", e);

   
227
        Throwables.propagate(e);

   
228
      } else {

   
229
        logger.error("Failed to commit transaction." +

   
230
            "Transaction rolled back.", e);

   
231
        throw new EventDeliveryException("Failed to commit transaction." +

   
232
            "Transaction rolled back.", e);

   
233
      }
293
      }

    
   
294
      throw new EventDeliveryException("Failed to commit transaction.", ex);
234
    } finally {
295
    } finally {
235
      txn.close();
296
      txn.close();
236
    }
297
    }

    
   
298
    return status;

    
   
299
  }

    
   
300

   

    
   
301
  private class PutRunnable implements Runnable {

    
   
302

   

    
   
303
    private final AtomicBoolean txnFail;

    
   
304
    private final List<Row> actions;

    
   
305
    private final Increment increment;

    
   
306
    private final String action;

    
   
307

   

    
   
308
    PutRunnable(AtomicBoolean fail, List<Row> puts) {

    
   
309
      txnFail = fail;

    
   
310
      actions = puts;

    
   
311
      increment = null;

    
   
312
      action = "puts";

    
   
313
    }

    
   
314

   

    
   
315
    PutRunnable(AtomicBoolean fail, Increment inc) {

    
   
316
      txnFail = fail;

    
   
317
      increment = inc;

    
   
318
      actions = null;

    
   
319
      action = "increment";

    
   
320
    }

    
   
321

   

    
   
322
    @Override

    
   
323
    public void run(){

    
   
324
      try {

    
   
325
        if(actions != null) {

    
   
326
          table.batch(actions);

    
   
327
        } else {

    
   
328
          table.increment(increment);

    
   
329
        }

    
   
330
        table.flushCommits();

    
   
331
      } catch (InterruptedException ex) {

    
   
332
        logger.warn("Sink: " + getName() + " interrupted while "

    
   
333
                + "waiting for " + action + " to complete. "

    
   
334
                + "Transaction will be rolled back.", ex);

    
   
335
        txnFail.set(true);

    
   
336
        Throwables.propagate(ex);

    
   
337
      } catch (IOException ex) {

    
   
338
        logger.warn("IOException in sink: " + getName() + ". "

    
   
339
                + "Transaction will be rolled back.", ex);

    
   
340
        txnFail.set(true);

    
   
341
      } catch (Throwable ex) {

    
   
342
        logger.warn("Unknown error in sink: " + getName() + ". "

    
   
343
                + "Transaction will be rolled back.", ex);

    
   
344
        txnFail.set(true);

    
   
345
        Throwables.propagate(ex);

    
   
346
      }

    
   
347
    }
237
  }
348
  }
238
}
349
}
  1. flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java: Loading...