Review Board 1.7.22


FLUME-1794: FileChannel check for full disks in the background

Review Request #8672 - Created Dec. 18, 2012 and submitted

Brock Noland
trunk
FLUME-1794
Reviewers
Flume
flume-git
Implements caching of usable space.  The cache is refreshed every 15 seconds.  Also cleans up a few compiler warnings in file channel.
Unit test added and passes
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
Revision af11dc5 New Change
[20] 109 lines
[+20] [+] class Log {
110
  private boolean useLogReplayV1;
110
  private boolean useLogReplayV1;
111
  private KeyProvider encryptionKeyProvider;
111
  private KeyProvider encryptionKeyProvider;
112
  private String encryptionCipherProvider;
112
  private String encryptionCipherProvider;
113
  private String encryptionKeyAlias;
113
  private String encryptionKeyAlias;
114
  private Key encryptionKey;
114
  private Key encryptionKey;

    
   
115
  private final long usableSpaceRefreshInterval;
115

    
   
116

   
116
  static class Builder {
117
  static class Builder {
117
    private long bCheckpointInterval;
118
    private long bCheckpointInterval;
118
    private long bMinimumRequiredSpace;
119
    private long bMinimumRequiredSpace;
119
    private long bMaxFileSize;
120
    private long bMaxFileSize;
[+20] [20] 8 lines
[+20] static class Builder {
128
    private boolean useLogReplayV1;
129
    private boolean useLogReplayV1;
129
    private boolean useFastReplay;
130
    private boolean useFastReplay;
130
    private KeyProvider bEncryptionKeyProvider;
131
    private KeyProvider bEncryptionKeyProvider;
131
    private String bEncryptionKeyAlias;
132
    private String bEncryptionKeyAlias;
132
    private String bEncryptionCipherProvider;
133
    private String bEncryptionCipherProvider;

    
   
134
    private long bUsableSpaceRefreshInterval = 15L * 1000L;

    
   
135

   

    
   
136
    Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {

    
   
137
      bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;

    
   
138
      return this;

    
   
139
    }
133

    
   
140

   
134
    Builder setCheckpointInterval(long interval) {
141
    Builder setCheckpointInterval(long interval) {
135
      bCheckpointInterval = interval;
142
      bCheckpointInterval = interval;
136
      return this;
143
      return this;
137
    }
144
    }
[+20] [20] 66 lines
[+20] static class Builder {
204
    Log build() throws IOException {
211
    Log build() throws IOException {
205
      return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
212
      return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
206
          bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
213
          bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
207
          useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
214
          useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
208
          bEncryptionKeyProvider, bEncryptionKeyAlias,
215
          bEncryptionKeyProvider, bEncryptionKeyAlias,
209
          bEncryptionCipherProvider, bLogDirs);
216
          bEncryptionCipherProvider, bUsableSpaceRefreshInterval,

    
   
217
          bLogDirs);
210
    }
218
    }
211
  }
219
  }
212

    
   
220

   
213
  private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
221
  private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
214
      int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
222
      int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
215
      String name, boolean useLogReplayV1, boolean useFastReplay,
223
      String name, boolean useLogReplayV1, boolean useFastReplay,
216
      long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
224
      long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
217
      @Nullable String encryptionKeyAlias,
225
      @Nullable String encryptionKeyAlias,
218
      @Nullable String encryptionCipherProvider, File... logDirs)
226
      @Nullable String encryptionCipherProvider,

    
   
227
      long usableSpaceRefreshInterval, File... logDirs)
219
          throws IOException {
228
          throws IOException {
220
    Preconditions.checkArgument(checkpointInterval > 0,
229
    Preconditions.checkArgument(checkpointInterval > 0,
221
        "checkpointInterval <= 0");
230
        "checkpointInterval <= 0");
222
    Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
231
    Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
223
    Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
232
    Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
224
    Preconditions.checkNotNull(checkpointDir, "checkpointDir");
233
    Preconditions.checkNotNull(checkpointDir, "checkpointDir");

    
   
234
    Preconditions.checkArgument(usableSpaceRefreshInterval > 0,

    
   
235
        "usableSpaceRefreshInterval <= 0");
225
    Preconditions.checkArgument(
236
    Preconditions.checkArgument(
226
        checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
237
        checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
227
            + checkpointDir + " could not be created");
238
            + checkpointDir + " could not be created");
228
    Preconditions.checkNotNull(logDirs, "logDirs");
239
    Preconditions.checkNotNull(logDirs, "logDirs");
229
    Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
240
    Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
230
    Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
241
    Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
231
            "channel name should be specified");
242
            "channel name should be specified");
232

    
   
243

   
233
    this.channelNameDescriptor = "[channel=" + name + "]";
244
    this.channelNameDescriptor = "[channel=" + name + "]";
234
    this.useLogReplayV1 = useLogReplayV1;
245
    this.useLogReplayV1 = useLogReplayV1;
235
    this.useFastReplay = useFastReplay;
246
    this.useFastReplay = useFastReplay;
236
    this.minimumRequiredSpace = minimumRequiredSpace;
247
    this.minimumRequiredSpace = minimumRequiredSpace;

    
   
248
    this.usableSpaceRefreshInterval = usableSpaceRefreshInterval;
237
    for (File logDir : logDirs) {
249
    for (File logDir : logDirs) {
238
      Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
250
      Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
239
          "LogDir " + logDir + " could not be created");
251
          "LogDir " + logDir + " could not be created");
240
    }
252
    }
241
    locks = Maps.newHashMap();
253
    locks = Maps.newHashMap();
[+20] [20] 48 lines
[+20] static class Builder {
290
  /**
302
  /**
291
   * Read checkpoint and data files from disk replaying them to the state
303
   * Read checkpoint and data files from disk replaying them to the state
292
   * directly before the shutdown or crash.
304
   * directly before the shutdown or crash.
293
   * @throws IOException
305
   * @throws IOException
294
   */
306
   */
295
  @SuppressWarnings("deprecation")

   
296
  void replay() throws IOException {
307
  void replay() throws IOException {
297
    Preconditions.checkState(!open, "Cannot replay after Log has been opened");
308
    Preconditions.checkState(!open, "Cannot replay after Log has been opened");
298

    
   
309

   
299
    Preconditions.checkState(tryLockExclusive(), "Cannot obtain lock on "
310
    Preconditions.checkState(tryLockExclusive(), "Cannot obtain lock on "
300
        + channelNameDescriptor);
311
        + channelNameDescriptor);
[+20] [20] 103 lines
[+20] static class Builder {
404
    } finally {
415
    } finally {
405
      unlockExclusive();
416
      unlockExclusive();
406
    }
417
    }
407
  }
418
  }
408

    
   
419

   

    
   
420
  @SuppressWarnings("deprecation")
409
  private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
421
  private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
410
          KeyProvider encryptionKeyProvider) throws Exception {
422
          KeyProvider encryptionKeyProvider) throws Exception {
411
    CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
423
    CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
412
            queue);
424
            queue);
413
    if (useFastReplay && rebuilder.rebuild()) {
425
    if (useFastReplay && rebuilder.rebuild()) {
[+20] [20] 385 lines
[+20] [+] private synchronized void roll(int index, ByteBuffer buffer)
799
          LOGGER.info("Roll start " + logDirs[index]);
811
          LOGGER.info("Roll start " + logDirs[index]);
800
          int fileID = nextFileID.incrementAndGet();
812
          int fileID = nextFileID.incrementAndGet();
801
          File file = new File(logDirs[index], PREFIX + fileID);
813
          File file = new File(logDirs[index], PREFIX + fileID);
802
          LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
814
          LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
803
              maxFileSize, encryptionKey, encryptionKeyAlias,
815
              maxFileSize, encryptionKey, encryptionKeyAlias,
804
              encryptionCipherProvider);
816
              encryptionCipherProvider, usableSpaceRefreshInterval);
805
          idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
817
          idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
806
              encryptionKeyProvider));
818
              encryptionKeyProvider));
807
          // writer from this point on will get new reference
819
          // writer from this point on will get new reference
808
          logFiles.set(index, writer);
820
          logFiles.set(index, writer);
809
          // close out old log
821
          // close out old log
[+20] [20] 209 lines
[+20] [+] private void unlock(File dir) throws IOException {
1019
  }
1031
  }
1020
  static class BackgroundWorker implements Runnable {
1032
  static class BackgroundWorker implements Runnable {
1021
    private static final Logger LOG = LoggerFactory
1033
    private static final Logger LOG = LoggerFactory
1022
        .getLogger(BackgroundWorker.class);
1034
        .getLogger(BackgroundWorker.class);
1023
    private final Log log;
1035
    private final Log log;
1024
    private volatile boolean run = true;

   
1025

    
   
1036

   
1026
    public BackgroundWorker(Log log) {
1037
    public BackgroundWorker(Log log) {
1027
      this.log = log;
1038
      this.log = log;
1028
    }
1039
    }
1029

    
   
1040

   
[+20] [20] 14 lines
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
Revision 8089ff3 New Change
 
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
Revision 1fe219a New Change
 
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
Revision 4c593a4 New Change
 
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
Revision aac7805 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
Revision 3d5bf59 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
Revision f9dbba5 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
Revision 9e28599 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
Revision 274ee7b New Change
 
  1. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java: Loading...
  2. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java: Loading...
  3. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java: Loading...
  4. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java: Loading...
  5. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java: Loading...
  6. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java: Loading...
  7. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java: Loading...
  8. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java: Loading...
  9. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java: Loading...