Review Board 1.7.22


FLUME-1899 - Make the spool directory source work with sub directories

Review Request #12211 - Created July 1, 2013 and updated

Phil Scala
https://issues.apache.org/jira/browse/FLUME-1899
Reviewers
Flume
flume-git
Added recursive directory walking to the ReliableSpoolingFileEventReader.  the same ignore pattern is used against the sub-directory names as well. 
Added new unit test, local testing on windows and CentOS.  By default the setting is not enabled, need to update configuration i.e. spool-agent.sources.spooler.recursiveDirectorySearch = true
flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
Revision f82fe1f New Change
[20] 79 lines
[+20] [+] public class ReliableSpoolingFileEventReader implements ReliableEventReader {
80
  private final String completedSuffix;
80
  private final String completedSuffix;
81
  private final String deserializerType;
81
  private final String deserializerType;
82
  private final Context deserializerContext;
82
  private final Context deserializerContext;
83
  private final Pattern ignorePattern;
83
  private final Pattern ignorePattern;
84
  private final File metaFile;
84
  private final File metaFile;

    
   
85
  private final boolean recursiveDirectorySearch;
85
  private final boolean annotateFileName;
86
  private final boolean annotateFileName;
86
  private final String fileNameHeader;
87
  private final String fileNameHeader;
87
  private final String deletePolicy;
88
  private final String deletePolicy;
88
  private final Charset inputCharset;
89
  private final Charset inputCharset;
89

    
   
90

   
[+20] [20] 7 lines
[+20] public class ReliableSpoolingFileEventReader implements ReliableEventReader {
97
   */
98
   */
98
  private ReliableSpoolingFileEventReader(File spoolDirectory,
99
  private ReliableSpoolingFileEventReader(File spoolDirectory,
99
      String completedSuffix, String ignorePattern, String trackerDirPath,
100
      String completedSuffix, String ignorePattern, String trackerDirPath,
100
      boolean annotateFileName, String fileNameHeader,
101
      boolean annotateFileName, String fileNameHeader,
101
      String deserializerType, Context deserializerContext,
102
      String deserializerType, Context deserializerContext,
102
      String deletePolicy, String inputCharset) throws IOException {
103
      String deletePolicy, String inputCharset, boolean recursiveDirectorySearch) throws IOException {
103

    
   
104

   
104
    // Sanity checks
105
    // Sanity checks
105
    Preconditions.checkNotNull(spoolDirectory);
106
    Preconditions.checkNotNull(spoolDirectory);
106
    Preconditions.checkNotNull(completedSuffix);
107
    Preconditions.checkNotNull(completedSuffix);
107
    Preconditions.checkNotNull(ignorePattern);
108
    Preconditions.checkNotNull(ignorePattern);
[+20] [20] 43 lines
[+20] public class ReliableSpoolingFileEventReader implements ReliableEventReader {
151
    this.annotateFileName = annotateFileName;
152
    this.annotateFileName = annotateFileName;
152
    this.fileNameHeader = fileNameHeader;
153
    this.fileNameHeader = fileNameHeader;
153
    this.ignorePattern = Pattern.compile(ignorePattern);
154
    this.ignorePattern = Pattern.compile(ignorePattern);
154
    this.deletePolicy = deletePolicy;
155
    this.deletePolicy = deletePolicy;
155
    this.inputCharset = Charset.forName(inputCharset);
156
    this.inputCharset = Charset.forName(inputCharset);

    
   
157
	this.recursiveDirectorySearch = recursiveDirectorySearch;
156

    
   
158

   
157
    File trackerDirectory = new File(trackerDirPath);
159
    File trackerDirectory = new File(trackerDirPath);
158

    
   
160

   
159
    // if relative path, treat as relative to spool directory
161
    // if relative path, treat as relative to spool directory
160
    if (!trackerDirectory.isAbsolute()) {
162
    if (!trackerDirectory.isAbsolute()) {
[+20] [20] 14 lines
[+20] public class ReliableSpoolingFileEventReader implements ReliableEventReader {
175
          trackerDirectory);
177
          trackerDirectory);
176
    }
178
    }
177

    
   
179

   
178
    this.metaFile = new File(trackerDirectory, metaFileName);
180
    this.metaFile = new File(trackerDirectory, metaFileName);
179
  }
181
  }

    
   
182
  

    
   
183
  /** 

    
   
184
   * Filter to exclude files/directories either hidden, finished, or names matching the ignore pattern

    
   
185
   */

    
   
186
  final FileFilter filter = new FileFilter() {

    
   
187
    public boolean accept(File candidate) {

    
   
188
	  if ( candidate.isDirectory() ) {

    
   
189
	    String directoryName = candidate.getName();

    
   
190
	    if ( (! recursiveDirectorySearch) || ( directoryName.startsWith(".")) || ignorePattern.matcher(directoryName).matches())

    
   
191
		{
Moved from 392

    
   
192
          return false;
Moved from 393

    
   
193
        }
Moved from 394

    
   
194
        return true;
Moved from 395

    
   
195
      }

    
   
196
      else{

    
   
197
        String fileName = candidate.getName();

    
   
198
        if ((fileName.endsWith(completedSuffix)) || ( fileName.startsWith(".")) || ignorePattern.matcher(fileName).matches()) {
Moved from 392

    
   
199
              return false;
Moved from 393

    
   
200
          }
Moved from 394

    
   
201
      }
Moved from 395

    
   
202
      return true;
Moved from 396

    
   
203
    }

    
   
204
  };
180

    
   
205

   
181
  /** Return the filename which generated the data from the last successful
206
  /** Return the filename which generated the data from the last successful
182
   * {@link #readEvents(int)} call. Returns null if called before any file
207
   * {@link #readEvents(int)} call. Returns null if called before any file
183
   * contents are read. */
208
   * contents are read. */
184
  public String getLastFileRead() {
209
  public String getLastFileRead() {
[+20] [20] 189 lines
[+20] [+] private void deleteCurrentFile(File fileToDelete) throws IOException {
374
    // now we no longer need the meta file
399
    // now we no longer need the meta file
375
    deleteMetaFile();
400
    deleteMetaFile();
376
  }
401
  }
377

    
   
402

   
378
  /**
403
  /**

    
   
404
   * Recursively gather candidate files

    
   
405
   * @param directory the directory to gather files from

    
   
406
   * @return list of files within the passed in directory

    
   
407
   */

    
   
408
  private List<File> getCandidateFiles(File directory){

    
   
409
    List<File> candidateFiles = new ArrayList<File>();

    
   
410
    if (directory==null || ! directory.isDirectory()){

    
   
411
      return candidateFiles;

    
   
412
    }

    
   
413
    for(File file : directory.listFiles(filter)){

    
   
414
      if (file.isDirectory()) {

    
   
415
        candidateFiles.addAll(getCandidateFiles(file));

    
   
416
      }

    
   
417
      else {

    
   
418
        candidateFiles.add(file);

    
   
419
      }

    
   
420
    }

    
   
421
    return candidateFiles;

    
   
422
  }

    
   
423

   

    
   
424
  /**
379
   * Find and open the oldest file in the chosen directory. If two or more
425
   * Find and open the oldest file in the chosen directory. If two or more
380
   * files are equally old, the file name with lower lexicographical value is
426
   * files are equally old, the file name with lower lexicographical value is
381
   * returned. If the directory is empty, this will return an absent option.
427
   * returned. If the directory is empty, this will return an absent option.
382
   */
428
   */
383
  private Optional<FileInfo> getNextFile() {
429
  private Optional<FileInfo> getNextFile() {
384
    /* Filter to exclude finished or hidden files */
430
    List<File> candidateFiles = getCandidateFiles(spoolDirectory);
385
    FileFilter filter = new FileFilter() {

   
386
      public boolean accept(File candidate) {

   
387
        String fileName = candidate.getName();

   
388
        if ((candidate.isDirectory()) ||

   
389
            (fileName.endsWith(completedSuffix)) ||

   
390
            (fileName.startsWith(".")) ||

   
391
            ignorePattern.matcher(fileName).matches()) {

   
392
          return false;
Moved to 199

   
393
        }
Moved to 200

   
394
        return true;
Moved to 201

   
395
      }
Moved to 202

   
396
    };
Moved to 203

   
397
    List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));

   
398
    if (candidateFiles.isEmpty()) {
431
    if (candidateFiles.isEmpty()) {
399
      return Optional.absent();
432
      return Optional.absent();
400
    } else {
433
    } else {
401
      Collections.sort(candidateFiles, new Comparator<File>() {
434
      Collections.sort(candidateFiles, new Comparator<File>() {
402
        public int compare(File a, File b) {
435
        public int compare(File a, File b) {
[+20] [20] 96 lines
[+20] [+] public static class Builder {
499
    private Context deserializerContext = new Context();
532
    private Context deserializerContext = new Context();
500
    private String deletePolicy =
533
    private String deletePolicy =
501
        SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
534
        SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
502
    private String inputCharset =
535
    private String inputCharset =
503
        SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
536
        SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;

    
   
537
    private boolean recursiveDirectorySearch =

    
   
538
        SpoolDirectorySourceConfigurationConstants.DEFAULT_RECURSIVE_DIRECTORY_SEARCH;
504

    
   
539

   
505
    public Builder spoolDirectory(File directory) {
540
    public Builder spoolDirectory(File directory) {
506
      this.spoolDirectory = directory;
541
      this.spoolDirectory = directory;
507
      return this;
542
      return this;
508
    }
543
    }
[+20] [20] 41 lines
[+20] [+] public Builder deletePolicy(String deletePolicy) {
550
    public Builder inputCharset(String inputCharset) {
585
    public Builder inputCharset(String inputCharset) {
551
      this.inputCharset = inputCharset;
586
      this.inputCharset = inputCharset;
552
      return this;
587
      return this;
553
    }
588
    }
554

    
   
589

   

    
   
590
    public Builder recursiveDirectorySearch(boolean recursiveDirectorySearch) {

    
   
591
      this.recursiveDirectorySearch = recursiveDirectorySearch;

    
   
592
	  return this;

    
   
593
    }

    
   
594

   
555
    public ReliableSpoolingFileEventReader build() throws IOException {
595
    public ReliableSpoolingFileEventReader build() throws IOException {
556
      return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
596
      return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
557
          ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
597
          ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
558
          deserializerType, deserializerContext, deletePolicy, inputCharset);
598
          deserializerType, deserializerContext, deletePolicy, inputCharset, recursiveDirectorySearch);
559
    }
599
    }
560
  }
600
  }
561

    
   
601

   
562
}
602
}
flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
Revision 7145580 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
Revision f3cc703 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
Revision 652d2a2 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision f84cda5 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java: Loading...
  4. flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java: Loading...
  5. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...