Review Board 1.7.22


FLUME-1864: Allow hdfs idle callback to clean up closed bucket writers

Review Request #9052 - Created Jan. 22, 2013 and updated

Juhani Connolly
Reviewers
Flume
flume-git
Stops close() from removing the idle callback and skips close in the idle callback if the writer is already closed.
Tests pass. Since sfWriters is protected it is hard to verify the changed behavior in unit tests. I however verified the behaviour in the java debugger.
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Revision 3f31ef2 New Change
[20] 308 lines
[+20] [+] private void doClose() throws IOException {
309
    if (timedRollFuture != null && !timedRollFuture.isDone()) {
309
    if (timedRollFuture != null && !timedRollFuture.isDone()) {
310
      timedRollFuture.cancel(false); // do not cancel myself if running!
310
      timedRollFuture.cancel(false); // do not cancel myself if running!
311
      timedRollFuture = null;
311
      timedRollFuture = null;
312
    }
312
    }
313

    
   
313

   
314
    if(idleFuture != null && !idleFuture.isDone()) {

   
315
      idleFuture.cancel(false);

   
316
      idleFuture = null;

   
317
    }

   
318

    
   

   
319
    if (bucketPath != null && fileSystem != null) {
314
    if (bucketPath != null && fileSystem != null) {
320
      renameBucket(); // could block or throw IOException
315
      renameBucket(); // could block or throw IOException
321
      fileSystem = null;
316
      fileSystem = null;
322
    }
317
    }
323
  }
318
  }
[+20] [20] 18 lines
[+20] [+] public Void run() throws Exception {
342
        // or been cancelled
337
        // or been cancelled
343
        if(idleFuture == null || idleFuture.cancel(false)) {
338
        if(idleFuture == null || idleFuture.cancel(false)) {
344
          Callable<Void> idleAction = new Callable<Void>() {
339
          Callable<Void> idleAction = new Callable<Void>() {
345
            public Void call() throws Exception {
340
            public Void call() throws Exception {
346
              try {
341
              try {

    
   
342
                if(isOpen) {
347
                LOG.info("Closing idle bucketWriter {}", bucketPath);
343
                  LOG.info("Closing idle bucketWriter {}", bucketPath);
348
                idleClosed = true;
344
                  idleClosed = true;
349
                close();
345
                  close();

    
   
346
                }
350
                if(onIdleCallback != null)
347
                if(onIdleCallback != null)
351
                  onIdleCallback.run(onIdleCallbackPath);
348
                  onIdleCallback.run(onIdleCallbackPath);
352
              } catch(Throwable t) {
349
              } catch(Throwable t) {
353
                LOG.error("Unexpected error", t);
350
                LOG.error("Unexpected error", t);
354
              }
351
              }
[+20] [20] 139 lines
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java: Loading...