Review Board 1.7.22


FLUME-1800: Docs for spooling source durability changes

Review Request #9077 - Created Jan. 24, 2013 and submitted

Mike Percy
FLUME-1800
Reviewers
Flume
flume-git
Mostly docs. Changed a couple params that were confusing. Also exposed inputCharset which was previously implemented but not available via source config.
Unit tests pass.
flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
Revision b19d0ea New Change
[20] 20 lines
[+20]
21

    
   
21

   
22
import java.io.File;
22
import java.io.File;
23
import java.io.FileFilter;
23
import java.io.FileFilter;
24
import java.io.FileNotFoundException;
24
import java.io.FileNotFoundException;
25
import java.io.IOException;
25
import java.io.IOException;

    
   
26
import java.nio.charset.Charset;
26
import java.util.*;
27
import java.util.*;
27
import java.util.regex.Pattern;
28
import java.util.regex.Pattern;
28

    
   
29

   
29
import org.apache.flume.Context;
30
import org.apache.flume.Context;
30
import org.apache.flume.Event;
31
import org.apache.flume.Event;
[+20] [20] 51 lines
[+20] [+] public class ReliableSpoolingFileEventReader implements ReliableEventReader {
82
  private final Pattern ignorePattern;
83
  private final Pattern ignorePattern;
83
  private final File metaFile;
84
  private final File metaFile;
84
  private final boolean annotateFileName;
85
  private final boolean annotateFileName;
85
  private final String fileNameHeader;
86
  private final String fileNameHeader;
86
  private final String deletePolicy;
87
  private final String deletePolicy;

    
   
88
  private final Charset inputCharset;
87

    
   
89

   
88
  private Optional<FileInfo> currentFile = Optional.absent();
90
  private Optional<FileInfo> currentFile = Optional.absent();
89
  /** Always contains the last file from which lines have been read. **/
91
  /** Always contains the last file from which lines have been read. **/
90
  private Optional<FileInfo> lastFileRead = Optional.absent();
92
  private Optional<FileInfo> lastFileRead = Optional.absent();
91
  private boolean committed = true;
93
  private boolean committed = true;
92

    
   
94

   
93
  /**
95
  /**
94
   * Create a ReliableSpoolingFileEventReader to watch the given directory.
96
   * Create a ReliableSpoolingFileEventReader to watch the given directory.
95
   */
97
   */
96
  private ReliableSpoolingFileEventReader(File spoolDirectory,
98
  private ReliableSpoolingFileEventReader(File spoolDirectory,
97
      String completedSuffix, String ignorePattern, String trackerDirPath,
99
      String completedSuffix, String ignorePattern, String trackerDirPath,
98
      boolean annotateFileName, String fileNameHeader,
100
      boolean annotateFileName, String fileNameHeader,
99
      String deserializerType, Context deserializerContext,
101
      String deserializerType, Context deserializerContext,
100
      String deletePolicy) throws IOException {
102
      String deletePolicy, String inputCharset) throws IOException {
101

    
   
103

   
102
    // Sanity checks
104
    // Sanity checks
103
    Preconditions.checkNotNull(spoolDirectory);
105
    Preconditions.checkNotNull(spoolDirectory);
104
    Preconditions.checkNotNull(completedSuffix);
106
    Preconditions.checkNotNull(completedSuffix);
105
    Preconditions.checkNotNull(ignorePattern);
107
    Preconditions.checkNotNull(ignorePattern);
106
    Preconditions.checkNotNull(trackerDirPath);
108
    Preconditions.checkNotNull(trackerDirPath);
107
    Preconditions.checkNotNull(deserializerType);
109
    Preconditions.checkNotNull(deserializerType);
108
    Preconditions.checkNotNull(deserializerContext);
110
    Preconditions.checkNotNull(deserializerContext);
109
    Preconditions.checkNotNull(deletePolicy);
111
    Preconditions.checkNotNull(deletePolicy);

    
   
112
    Preconditions.checkNotNull(inputCharset);
110

    
   
113

   
111
    // validate delete policy
114
    // validate delete policy
112
    if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
115
    if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
113
        !deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
116
        !deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
114
      throw new IllegalArgumentException("Delete policies other than " +
117
      throw new IllegalArgumentException("Delete policies other than " +
[+20] [20] 32 lines
[+20] public class ReliableSpoolingFileEventReader implements ReliableEventReader {
147
    this.deserializerContext = deserializerContext;
150
    this.deserializerContext = deserializerContext;
148
    this.annotateFileName = annotateFileName;
151
    this.annotateFileName = annotateFileName;
149
    this.fileNameHeader = fileNameHeader;
152
    this.fileNameHeader = fileNameHeader;
150
    this.ignorePattern = Pattern.compile(ignorePattern);
153
    this.ignorePattern = Pattern.compile(ignorePattern);
151
    this.deletePolicy = deletePolicy;
154
    this.deletePolicy = deletePolicy;

    
   
155
    this.inputCharset = Charset.forName(inputCharset);
152

    
   
156

   
153
    File trackerDirectory = new File(trackerDirPath);
157
    File trackerDirectory = new File(trackerDirPath);
154

    
   
158

   
155
    // if relative path, treat as relative to spool directory
159
    // if relative path, treat as relative to spool directory
156
    if (!trackerDirectory.isAbsolute()) {
160
    if (!trackerDirectory.isAbsolute()) {
[+20] [20] 263 lines
[+20] [+] public int compare(File a, File b) {
420
        Preconditions.checkState(tracker.getTarget().equals(nextPath),
424
        Preconditions.checkState(tracker.getTarget().equals(nextPath),
421
            "Tracker target %s does not equal expected filename %s",
425
            "Tracker target %s does not equal expected filename %s",
422
            tracker.getTarget(), nextPath);
426
            tracker.getTarget(), nextPath);
423

    
   
427

   
424
        ResettableInputStream in =
428
        ResettableInputStream in =
425
            new ResettableFileInputStream(nextFile, tracker);
429
            new ResettableFileInputStream(nextFile, tracker,

    
   
430
                ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset);
426
        EventDeserializer deserializer = EventDeserializerFactory.getInstance
431
        EventDeserializer deserializer = EventDeserializerFactory.getInstance
427
            (deserializerType, deserializerContext, in);
432
            (deserializerType, deserializerContext, in);
428

    
   
433

   
429
        return Optional.of(new FileInfo(nextFile, deserializer));
434
        return Optional.of(new FileInfo(nextFile, deserializer));
430
      } catch (FileNotFoundException e) {
435
      } catch (FileNotFoundException e) {
[+20] [20] 49 lines
[+20] [+] public static class Builder {
480
    private String completedSuffix =
485
    private String completedSuffix =
481
        SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX;
486
        SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX;
482
    private String ignorePattern =
487
    private String ignorePattern =
483
        SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
488
        SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
484
    private String trackerDirPath =
489
    private String trackerDirPath =
485
        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
490
        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
486
    private Boolean annotateFileName =
491
    private Boolean annotateFileName =
487
        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
492
        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
488
    private String fileNameHeader =
493
    private String fileNameHeader =
489
        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
494
        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
490
    private String deserializerType =
495
    private String deserializerType =
491
        SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER;
496
        SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER;
492
    private Context deserializerContext = new Context();
497
    private Context deserializerContext = new Context();
493
    private String deletePolicy =
498
    private String deletePolicy =
494
        SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
499
        SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;

    
   
500
    private String inputCharset =

    
   
501
        SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
495

    
   
502

   
496
    public Builder spoolDirectory(File directory) {
503
    public Builder spoolDirectory(File directory) {
497
      this.spoolDirectory = directory;
504
      this.spoolDirectory = directory;
498
      return this;
505
      return this;
499
    }
506
    }
[+20] [20] 36 lines
[+20] [+] public Builder deserializerContext(Context deserializerContext) {
536
    public Builder deletePolicy(String deletePolicy) {
543
    public Builder deletePolicy(String deletePolicy) {
537
      this.deletePolicy = deletePolicy;
544
      this.deletePolicy = deletePolicy;
538
      return this;
545
      return this;
539
    }
546
    }
540

    
   
547

   

    
   
548
    public Builder inputCharset(String inputCharset) {

    
   
549
      this.inputCharset = inputCharset;

    
   
550
      return this;

    
   
551
    }

    
   
552

   
541
    public ReliableSpoolingFileEventReader build() throws IOException {
553
    public ReliableSpoolingFileEventReader build() throws IOException {
542
      return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
554
      return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
543
          ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
555
          ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
544
          deserializerType, deserializerContext, deletePolicy);
556
          deserializerType, deserializerContext, deletePolicy, inputCharset);
545
    }
557
    }
546
  }
558
  }
547

    
   
559

   
548
}
560
}
flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
Revision 552bd48 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
Revision afc7288 New Change
 
flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
Revision a29606e New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision aa92974 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java: Loading...
  4. flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java: Loading...
  5. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...