Review Board 1.7.22


FLUME-1925. HDFS timeouts should not starve other threads

Review Request #9568 - Created Feb. 23, 2013 and submitted

Hari Shreedharan
FLUME-1925
Reviewers
Flume
flume-git
Only wrap the real HDFS calls in timeout callables.
All current tests pass.
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Revision 0786857 New Change
[20] 18 lines
[+20]
19
package org.apache.flume.sink.hdfs;
19
package org.apache.flume.sink.hdfs;
20

    
   
20

   
21
import java.io.IOException;
21
import java.io.IOException;
22
import java.security.PrivilegedExceptionAction;
22
import java.security.PrivilegedExceptionAction;
23
import java.util.concurrent.Callable;
23
import java.util.concurrent.Callable;

    
   
24
import java.util.concurrent.CancellationException;

    
   
25
import java.util.concurrent.ExecutionException;

    
   
26
import java.util.concurrent.ExecutorService;

    
   
27
import java.util.concurrent.Future;
24
import java.util.concurrent.ScheduledExecutorService;
28
import java.util.concurrent.ScheduledExecutorService;
25
import java.util.concurrent.ScheduledFuture;
29
import java.util.concurrent.ScheduledFuture;
26
import java.util.concurrent.TimeUnit;
30
import java.util.concurrent.TimeUnit;

    
   
31
import java.util.concurrent.TimeoutException;
27
import java.util.concurrent.atomic.AtomicLong;
32
import java.util.concurrent.atomic.AtomicLong;
28

    
   
33

   
29
import org.apache.flume.Clock;
34
import org.apache.flume.Clock;
30
import org.apache.flume.Context;
35
import org.apache.flume.Context;
31
import org.apache.flume.Event;
36
import org.apache.flume.Event;
32
import org.apache.flume.SystemClock;
37
import org.apache.flume.SystemClock;
33
import org.apache.flume.instrumentation.SinkCounter;
38
import org.apache.flume.instrumentation.SinkCounter;
34
import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
39
import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
35
import org.apache.hadoop.conf.Configuration;
40
import org.apache.hadoop.conf.Configuration;
36
import org.apache.hadoop.fs.FileSystem;
41
import org.apache.hadoop.fs.FileSystem;
37
import org.apache.hadoop.fs.LocalFileSystem;

   
38
import org.apache.hadoop.fs.Path;
42
import org.apache.hadoop.fs.Path;
39
import org.apache.hadoop.io.SequenceFile.CompressionType;
43
import org.apache.hadoop.io.SequenceFile.CompressionType;
40
import org.apache.hadoop.io.compress.CompressionCodec;
44
import org.apache.hadoop.io.compress.CompressionCodec;
41
import org.apache.hadoop.security.UserGroupInformation;
45
import org.apache.hadoop.security.UserGroupInformation;
42
import org.slf4j.Logger;
46
import org.slf4j.Logger;
[+20] [20] 48 lines
[+20] [+] class BucketWriter {
91
  private SinkCounter sinkCounter;
95
  private SinkCounter sinkCounter;
92
  private final int idleTimeout;
96
  private final int idleTimeout;
93
  private volatile ScheduledFuture<Void> idleFuture;
97
  private volatile ScheduledFuture<Void> idleFuture;
94
  private final WriterCallback onIdleCallback;
98
  private final WriterCallback onIdleCallback;
95
  private final String onIdleCallbackPath;
99
  private final String onIdleCallbackPath;

    
   
100
  private final long callTimeout;

    
   
101
  private final ExecutorService callTimeoutPool;
96

    
   
102

   
97
  private Clock clock = new SystemClock();
103
  private Clock clock = new SystemClock();
98

    
   
104

   
99
  // flag that the bucket writer was closed due to idling and thus shouldn't be
105
  // flag that the bucket writer was closed due to idling and thus shouldn't be
100
  // reopened. Not ideal, but avoids internals of owners
106
  // reopened. Not ideal, but avoids internals of owners
101
  protected boolean idleClosed = false;
107
  protected boolean idleClosed = false;
102

    
   
108

   
103
  BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
109
  BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
104
      Context context, String filePath, String fileName, String inUsePrefix,
110
    Context context, String filePath, String fileName, String inUsePrefix,
105
      String inUseSuffix, String fileSuffix, CompressionCodec codeC,
111
    String inUseSuffix, String fileSuffix, CompressionCodec codeC,
106
      CompressionType compType, HDFSWriter writer,
112
    CompressionType compType, HDFSWriter writer,
107
      ScheduledExecutorService timedRollerPool, UserGroupInformation user,
113
    ScheduledExecutorService timedRollerPool, UserGroupInformation user,
108
      SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
114
    SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
109
      String onIdleCallbackPath) {
115
    String onIdleCallbackPath, long callTimeout,

    
   
116
    ExecutorService callTimeoutPool) {
110
    this.rollInterval = rollInterval;
117
    this.rollInterval = rollInterval;
111
    this.rollSize = rollSize;
118
    this.rollSize = rollSize;
112
    this.rollCount = rollCount;
119
    this.rollCount = rollCount;
113
    this.batchSize = batchSize;
120
    this.batchSize = batchSize;
114
    this.filePath = filePath;
121
    this.filePath = filePath;
[+20] [20] 8 lines
[+20] class BucketWriter {
123
    this.user = user;
130
    this.user = user;
124
    this.sinkCounter = sinkCounter;
131
    this.sinkCounter = sinkCounter;
125
    this.idleTimeout = idleTimeout;
132
    this.idleTimeout = idleTimeout;
126
    this.onIdleCallback = onIdleCallback;
133
    this.onIdleCallback = onIdleCallback;
127
    this.onIdleCallbackPath = onIdleCallbackPath;
134
    this.onIdleCallbackPath = onIdleCallbackPath;
128

    
   
135
    this.callTimeout = callTimeout;

    
   
136
    this.callTimeoutPool = callTimeoutPool;
129
    fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
137
    fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
130

    
   
138

   
131
    isOpen = false;
139
    isOpen = false;
132
    this.writer.configure(context);
140
    this.writer.configure(context);
133
  }
141
  }
[+20] [20] 57 lines
[+20] [+] public Void run() throws Exception {
191
  private void doOpen() throws IOException, InterruptedException {
199
  private void doOpen() throws IOException, InterruptedException {
192
    if ((filePath == null) || (writer == null)) {
200
    if ((filePath == null) || (writer == null)) {
193
      throw new IOException("Invalid file settings");
201
      throw new IOException("Invalid file settings");
194
    }
202
    }
195

    
   
203

   
196
    Configuration config = new Configuration();
204
    final Configuration config = new Configuration();
197
    // disable FileSystem JVM shutdown hook
205
    // disable FileSystem JVM shutdown hook
198
    config.setBoolean("fs.automatic.close", false);
206
    config.setBoolean("fs.automatic.close", false);
199

    
   
207

   
200
    // Hadoop is not thread safe when doing certain RPC operations,
208
    // Hadoop is not thread safe when doing certain RPC operations,
201
    // including getFileSystem(), when running under Kerberos.
209
    // including getFileSystem(), when running under Kerberos.
[+20] [20] 19 lines
[+20] private void doOpen() throws IOException, InterruptedException {
221
        bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
229
        bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
222
          + fullFileName + inUseSuffix;
230
          + fullFileName + inUseSuffix;
223
        targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
231
        targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
224

    
   
232

   
225
        LOG.info("Creating " + bucketPath);
233
        LOG.info("Creating " + bucketPath);

    
   
234
        callWithTimeout(new Callable<Void>() {

    
   
235
          @Override

    
   
236
          public Void call() throws Exception {
226
        if (codeC == null) {
237
            if (codeC == null) {
227
          // Need to get reference to FS using above config before underlying
238
              // Need to get reference to FS using above config before underlying
228
          // writer does in order to avoid shutdown hook & IllegalStateExceptions
239
              // writer does in order to avoid shutdown hook & IllegalStateExceptions
229
          fileSystem = new Path(bucketPath).getFileSystem(config);
240
              fileSystem = new Path(bucketPath).getFileSystem(config);
230
          writer.open(bucketPath);
241
              writer.open(bucketPath);
231
        } else {
242
            } else {
232
          // need to get reference to FS before writer does to avoid shutdown hook
243
              // need to get reference to FS before writer does to avoid shutdown hook
233
          fileSystem = new Path(bucketPath).getFileSystem(config);
244
              fileSystem = new Path(bucketPath).getFileSystem(config);
234
          writer.open(bucketPath, codeC, compType);
245
              writer.open(bucketPath, codeC, compType);
235
        }
246
            }

    
   
247
            return null;

    
   
248
          }

    
   
249
        });
236
      } catch (Exception ex) {
250
      } catch (Exception ex) {
237
        sinkCounter.incrementConnectionFailedCount();
251
        sinkCounter.incrementConnectionFailedCount();
238
        if (ex instanceof IOException) {
252
        if (ex instanceof IOException) {
239
          throw (IOException) ex;
253
          throw (IOException) ex;
240
        } else {
254
        } else {
[+20] [20] 44 lines
[+20] [+] public Void run() throws Exception {
285

    
   
299

   
286
  /**
300
  /**
287
   * doClose() must only be called by close()
301
   * doClose() must only be called by close()
288
   * @throws IOException
302
   * @throws IOException
289
   */
303
   */
290
  private void doClose() throws IOException {
304
  private void doClose() throws IOException, InterruptedException {
291
    LOG.debug("Closing {}", bucketPath);
305
    LOG.debug("Closing {}", bucketPath);
292
    if (isOpen) {
306
    if (isOpen) {
293
      try {
307
      try {

    
   
308
        callWithTimeout(new Callable<Void>() {

    
   
309
          @Override

    
   
310
          public Void call() throws Exception {
294
        writer.close(); // could block
311
            writer.close(); // could block

    
   
312
            return null;

    
   
313
          }

    
   
314
        });
295
        sinkCounter.incrementConnectionClosedCount();
315
        sinkCounter.incrementConnectionClosedCount();
296
      } catch (IOException e) {
316
      } catch (IOException e) {
297
        LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
317
        LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
298
            "). Exception follows.", e);
318
            "). Exception follows.", e);
299
        sinkCounter.incrementConnectionFailedCount();
319
        sinkCounter.incrementConnectionFailedCount();
[+20] [20] 59 lines
[+20] [+] public Void call() throws Exception {
359

    
   
379

   
360
  /**
380
  /**
361
   * doFlush() must only be called by flush()
381
   * doFlush() must only be called by flush()
362
   * @throws IOException
382
   * @throws IOException
363
   */
383
   */
364
  private void doFlush() throws IOException {
384
  private void doFlush() throws IOException, InterruptedException {

    
   
385
    callWithTimeout(new Callable<Void>() {

    
   
386
      @Override

    
   
387
      public Void call() throws Exception {
365
    writer.sync(); // could block
388
        writer.sync(); // could block

    
   
389
        return null;

    
   
390
      }

    
   
391
    });
366
    batchCounter = 0;
392
    batchCounter = 0;
367
  }
393
  }
368

    
   
394

   
369
  /**
395
  /**
370
   * Open file handles, write data, update stats, handle file rolling and
396
   * Open file handles, write data, update stats, handle file rolling and
[+20] [20] 5 lines
[+20] private void doFlush() throws IOException { [+] private void doFlush() throws IOException, InterruptedException {
376
   * creation time reflects when the first event was written.
402
   * creation time reflects when the first event was written.
377
   *
403
   *
378
   * @throws IOException
404
   * @throws IOException
379
   * @throws InterruptedException
405
   * @throws InterruptedException
380
   */
406
   */
381
  public synchronized void append(Event event)
407
  public synchronized void append(final Event event)
382
          throws IOException, InterruptedException {
408
          throws IOException, InterruptedException {
383
    checkAndThrowInterruptedException();
409
    checkAndThrowInterruptedException();
384
    if (!isOpen) {
410
    if (!isOpen) {
385
      if(idleClosed) {
411
      if(idleClosed) {
386
        throw new IOException("This bucket writer was closed due to idling and this handle " +
412
        throw new IOException("This bucket writer was closed due to idling and this handle " +
[+20] [20] 9 lines
[+20] public synchronized void append(Event event) [+] public synchronized void append(final Event event)
396
    }
422
    }
397

    
   
423

   
398
    // write the event
424
    // write the event
399
    try {
425
    try {
400
      sinkCounter.incrementEventDrainAttemptCount();
426
      sinkCounter.incrementEventDrainAttemptCount();

    
   
427
      callWithTimeout(new Callable<Void>() {

    
   
428
        @Override

    
   
429
        public Void call() throws Exception {
401
      writer.append(event); // could block
430
          writer.append(event); // could block

    
   
431
          return null;

    
   
432
        }

    
   
433
      });
402
    } catch (IOException e) {
434
    } catch (IOException e) {
403
      LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
435
      LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
404
          bucketPath + ") and rethrowing exception.",
436
          bucketPath + ") and rethrowing exception.",
405
          e.getMessage());
437
          e.getMessage());
406
      try {
438
      try {
[+20] [20] 35 lines
[+20] [+] private boolean shouldRotate() {
442
  }
474
  }
443

    
   
475

   
444
  /**
476
  /**
445
   * Rename bucketPath file from .tmp to permanent location.
477
   * Rename bucketPath file from .tmp to permanent location.
446
   */
478
   */
447
  private void renameBucket() throws IOException {
479
  private void renameBucket() throws IOException, InterruptedException {
448
    if(bucketPath.equals(targetPath)) {
480
    if(bucketPath.equals(targetPath)) {
449
      return;
481
      return;
450
    }
482
    }
451

    
   
483

   
452
    Path srcPath = new Path(bucketPath);
484
    final Path srcPath = new Path(bucketPath);
453
    Path dstPath = new Path(targetPath);
485
    final Path dstPath = new Path(targetPath);
454

    
   
486

   

    
   
487
    callWithTimeout(new Callable<Object>() {

    
   
488
      @Override

    
   
489
      public Object call() throws Exception {
455
    if(fileSystem.exists(srcPath)) { // could block
490
        if(fileSystem.exists(srcPath)) { // could block
456
      LOG.info("Renaming " + srcPath + " to " + dstPath);
491
          LOG.info("Renaming " + srcPath + " to " + dstPath);
457
      fileSystem.rename(srcPath, dstPath); // could block
492
          fileSystem.rename(srcPath, dstPath); // could block
458
    }
493
        }

    
   
494
        return null;

    
   
495
      }

    
   
496
    });
459
  }
497
  }
460

    
   
498

   
461
  @Override
499
  @Override
462
  public String toString() {
500
  public String toString() {
463
    return "[ " + this.getClass().getSimpleName() + " targetPath = " + targetPath +
501
    return "[ " + this.getClass().getSimpleName() + " targetPath = " + targetPath +
[+20] [20] 19 lines
[+20] [+] private static void checkAndThrowInterruptedException()
483
      throw new InterruptedException("Timed out before HDFS call was made. "
521
      throw new InterruptedException("Timed out before HDFS call was made. "
484
              + "Your hdfs.callTimeout might be set too low or HDFS calls are "
522
              + "Your hdfs.callTimeout might be set too low or HDFS calls are "
485
              + "taking too long.");
523
              + "taking too long.");
486
    }
524
    }
487
  }
525
  }

    
   
526

   

    
   
527
  /**

    
   
528
   * Execute the callable on a separate thread and wait for the completion

    
   
529
   * for the specified amount of time in milliseconds. In case of timeout

    
   
530
   * cancel the callable and throw an IOException

    
   
531
   */

    
   
532
  private <T> T callWithTimeout(Callable<T> callable)

    
   
533
    throws IOException, InterruptedException {

    
   
534
    Future<T> future = callTimeoutPool.submit(callable);

    
   
535
    try {

    
   
536
      if (callTimeout > 0) {

    
   
537
        return future.get(callTimeout, TimeUnit.MILLISECONDS);

    
   
538
      } else {

    
   
539
        return future.get();

    
   
540
      }

    
   
541
    } catch (TimeoutException eT) {

    
   
542
      future.cancel(true);

    
   
543
      sinkCounter.incrementConnectionFailedCount();

    
   
544
      throw new IOException("Callable timed out after " + callTimeout + " ms",

    
   
545
        eT);

    
   
546
    } catch (ExecutionException e1) {

    
   
547
      sinkCounter.incrementConnectionFailedCount();

    
   
548
      Throwable cause = e1.getCause();

    
   
549
      if (cause instanceof IOException) {

    
   
550
        throw (IOException) cause;

    
   
551
      } else if (cause instanceof InterruptedException) {

    
   
552
        throw (InterruptedException) cause;

    
   
553
      } else if (cause instanceof RuntimeException) {

    
   
554
        throw (RuntimeException) cause;

    
   
555
      } else if (cause instanceof Error) {

    
   
556
        throw (Error)cause;

    
   
557
      } else {

    
   
558
        throw new RuntimeException(e1);

    
   
559
      }

    
   
560
    } catch (CancellationException ce) {

    
   
561
      throw new InterruptedException(

    
   
562
        "Blocked callable interrupted by rotation event");

    
   
563
    } catch (InterruptedException ex) {

    
   
564
      LOG.warn("Unexpected Exception " + ex.getMessage(), ex);

    
   
565
      throw ex;

    
   
566
    }

    
   
567
  }

    
   
568

   
488
}
569
}
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Revision 76e3d1f New Change
 
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
Revision ebe277c New Change
 
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java: Loading...
  2. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java: Loading...
  3. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java: Loading...