Review Board 1.7.22


FLUME-1937: Issue with maxUnderReplication in HDFS sink

Review Request #9838 - Created March 9, 2013 and submitted

Mike Percy
FLUME-1937
Reviewers
Flume
flume-git
maxUnderReplication is not respected
New unit test
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Revision c11fb20 New Change
[20] 89 lines
[+20] [+] class BucketWriter {
90
  private volatile String bucketPath;
90
  private volatile String bucketPath;
91
  private volatile String targetPath;
91
  private volatile String targetPath;
92
  private volatile long batchCounter;
92
  private volatile long batchCounter;
93
  private volatile boolean isOpen;
93
  private volatile boolean isOpen;
94
  private volatile boolean isUnderReplicated;
94
  private volatile boolean isUnderReplicated;
95
  private volatile int consecutiveUnderReplRotateCount;
95
  private volatile int consecutiveUnderReplRotateCount = 0;
96
  private volatile ScheduledFuture<Void> timedRollFuture;
96
  private volatile ScheduledFuture<Void> timedRollFuture;
97
  private SinkCounter sinkCounter;
97
  private SinkCounter sinkCounter;
98
  private final int idleTimeout;
98
  private final int idleTimeout;
99
  private volatile ScheduledFuture<Void> idleFuture;
99
  private volatile ScheduledFuture<Void> idleFuture;
100
  private final WriterCallback onIdleCallback;
100
  private final WriterCallback onIdleCallback;
[+20] [20] 90 lines
[+20] [+] private void open() throws IOException, InterruptedException {
191
      public Void run() throws Exception {
191
      public Void run() throws Exception {
192
        doOpen();
192
        doOpen();
193
        return null;
193
        return null;
194
      }
194
      }
195
    });
195
    });
196

    
   

   
197
    // ensure new files reset under-rep rotate count

   
198
    consecutiveUnderReplRotateCount = 0;

   
199
  }
196
  }
200

    
   
197

   
201
  /**
198
  /**
202
   * doOpen() must only be called by open()
199
   * doOpen() must only be called by open()
203
   * @throws IOException
200
   * @throws IOException
[+20] [20] 370 lines
[+20] [+] private static void checkAndThrowInterruptedException()
574
        return future.get();
571
        return future.get();
575
      }
572
      }
576
    } catch (TimeoutException eT) {
573
    } catch (TimeoutException eT) {
577
      future.cancel(true);
574
      future.cancel(true);
578
      sinkCounter.incrementConnectionFailedCount();
575
      sinkCounter.incrementConnectionFailedCount();
579
      throw new IOException("Callable timed out after " + callTimeout + " ms",
576
      throw new IOException("Callable timed out after " + callTimeout + " ms" +

    
   
577
          " on file: " + bucketPath,
580
        eT);
578
        eT);
581
    } catch (ExecutionException e1) {
579
    } catch (ExecutionException e1) {
582
      sinkCounter.incrementConnectionFailedCount();
580
      sinkCounter.incrementConnectionFailedCount();
583
      Throwable cause = e1.getCause();
581
      Throwable cause = e1.getCause();
584
      if (cause instanceof IOException) {
582
      if (cause instanceof IOException) {
[+20] [20] 20 lines
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
Revision c2b96f7 New Change
 
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java: Loading...
  2. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java: Loading...