Review Board 1.7.22


FLUME-2157. Spool directory source does not shut down correctly when Flume is reconfigured.

Review Request #13463 - Created Aug. 10, 2013 and updated

Mike Percy
FLUME-2157
Reviewers
Flume
flume-git
Spool directory source does not shut down correctly when Flume is reconfigured. This can lead to all sorts of problems stemming from the fact that we have threads running which should not be. In the logs, it looks like multiple spool directory sources are running on the same directory, even though the config may say otherwise.
Added a unit test for this specific case and also an integration test that simply runs an agent with many concurrent spool directory sources.
flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
Revision f82fe1f New Change
[20] 130 lines
[+20] [+] public class ReliableSpoolingFileEventReader implements ReliableEventReader {
131
    Preconditions.checkState(spoolDirectory.isDirectory(),
131
    Preconditions.checkState(spoolDirectory.isDirectory(),
132
        "Path is not a directory: " + spoolDirectory.getAbsolutePath());
132
        "Path is not a directory: " + spoolDirectory.getAbsolutePath());
133

    
   
133

   
134
    // Do a canary test to make sure we have access to spooling directory
134
    // Do a canary test to make sure we have access to spooling directory
135
    try {
135
    try {
136
      File f1 = File.createTempFile("flume", "test", spoolDirectory);
136
      File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
137
      Files.write("testing flume file permissions\n", f1, Charsets.UTF_8);
137
          spoolDirectory);
138
      Files.readLines(f1, Charsets.UTF_8);
138
      Files.write("testing flume file permissions\n", canary, Charsets.UTF_8);
139
      if (!f1.delete()) {
139
      List<String> lines = Files.readLines(canary, Charsets.UTF_8);
140
        throw new IOException("Unable to delete canary file " + f1);
140
      Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);

    
   
141
      if (!canary.delete()) {

    
   
142
        throw new IOException("Unable to delete canary file " + canary);
141
      }
143
      }

    
   
144
      logger.debug("Successfully created and deleted canary file: {}", canary);
142
    } catch (IOException e) {
145
    } catch (IOException e) {
143
      throw new FlumeException("Unable to read and modify files" +
146
      throw new FlumeException("Unable to read and modify files" +
144
          " in the spooling directory: " + spoolDirectory, e);
147
          " in the spooling directory: " + spoolDirectory, e);
145
    }
148
    }
146

    
   
149

   
[+20] [20] 416 lines
flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
Revision 7145580 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
Revision 652d2a2 New Change
 
flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java
New File
 
flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
Revision bc58340 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java: Loading...
  4. flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java: Loading...
  5. flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java: Loading...