Review Board 1.7.22


HDFS Sink should check if file is closed and retry if it is not.

Review Request #11583 - Created May 31, 2013 and updated

Ted Malaska
trunk
2007
Reviewers
Flume
flume-git
We can use the new API added in HDFS-4525. We will need to use reflection though, so we can run against a version of HDFS which does not have this API.


 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
Revision bc3b383 New Change
[20] 24 lines
[+20]
25
import org.apache.hadoop.fs.FileSystem;
25
import org.apache.hadoop.fs.FileSystem;
26
import org.apache.hadoop.fs.Path;
26
import org.apache.hadoop.fs.Path;
27
import org.slf4j.Logger;
27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
28
import org.slf4j.LoggerFactory;
29

    
   
29

   

    
   
30
import java.io.IOException;
30
import java.io.OutputStream;
31
import java.io.OutputStream;
31
import java.lang.reflect.InvocationTargetException;
32
import java.lang.reflect.InvocationTargetException;
32
import java.lang.reflect.Method;
33
import java.lang.reflect.Method;
33

    
   
34

   
34
@InterfaceAudience.Private
35
@InterfaceAudience.Private
[+20] [20] 7 lines
[+20]
42
  private FileSystem fs;
43
  private FileSystem fs;
43
  private Path destPath;
44
  private Path destPath;
44
  private Method refGetNumCurrentReplicas = null;
45
  private Method refGetNumCurrentReplicas = null;
45
  private Method refGetDefaultReplication = null;
46
  private Method refGetDefaultReplication = null;
46
  private Integer configuredMinReplicas = null;
47
  private Integer configuredMinReplicas = null;

    
   
48
  private Integer numberOfCloseRetries = null;

    
   
49
  private Integer timeBetweenCloseRetries = null;
47

    
   
50

   
48
  final static Object [] NO_ARGS = new Object []{};
51
  final static Object [] NO_ARGS = new Object []{};
49

    
   
52

   
50
  @Override
53
  @Override
51
  public void configure(Context context) {
54
  public void configure(Context context) {
52
    configuredMinReplicas = context.getInteger("hdfs.minBlockReplicas");
55
    configuredMinReplicas = context.getInteger("hdfs.minBlockReplicas");
53
    if (configuredMinReplicas != null) {
56
    if (configuredMinReplicas != null) {
54
      Preconditions.checkArgument(configuredMinReplicas >= 0,
57
      Preconditions.checkArgument(configuredMinReplicas >= 0,
55
          "hdfs.minBlockReplicas must be greater than or equal to 0");
58
          "hdfs.minBlockReplicas must be greater than or equal to 0");
56
    }
59
    }

    
   
60
    numberOfCloseRetries = context.getInteger("hdfs.close.retries", 3);

    
   
61
    timeBetweenCloseRetries = context.getInteger("hdfs.time.between.close.retries", 60000);
57
  }
62
  }
58

    
   
63

   
59
  /**
64
  /**
60
   * Contract for subclasses: Call registerCurrentStream() on open,
65
   * Contract for subclasses: Call registerCurrentStream() on open,
61
   * unregisterCurrentStream() on close, and the base class takes care of the
66
   * unregisterCurrentStream() on close, and the base class takes care of the
[+20] [20] 148 lines
[+20] [+] private Method reflectGetDefaultReplication(FileSystem fileSystem) {
210
          "HADOOP-8014");
215
          "HADOOP-8014");
211
    }
216
    }
212
    return m;
217
    return m;
213
  }
218
  }
214

    
   
219

   

    
   
220
  /**

    
   
221
   * This will

    
   
222
   * @param outputStream

    
   
223
   * @throws IOException

    
   
224
   */

    
   
225
  protected void closeHDFSOutputStream(OutputStream outputStream)

    
   
226
      throws IOException {

    
   
227
    boolean ableToCloseFileOnFirstTry = true;

    
   
228
    try {

    
   
229
      outputStream.close();

    
   
230
    } catch (IOException e) {

    
   
231
      ableToCloseFileOnFirstTry = false;

    
   
232
      logger.error("Unable to close HDFS file: '" + destPath + "'");

    
   
233
    }

    
   
234
    if (numberOfCloseRetries > 0) {

    
   
235
      try {

    
   
236
        Method isFileClosedMethod = fs.getClass().getMethod("isFileClosed", Path.class);

    
   
237
        int retryCounter = 0;

    
   
238
        while (retryCounter < numberOfCloseRetries.intValue() &&

    
   
239
            Boolean.FALSE.equals(isFileClosedMethod.invoke(fs, destPath))) {

    
   
240
          retryCounter++;

    
   
241
          synchronized(timeBetweenCloseRetries) {

    
   
242
            logger.debug("Waiting: '" + timeBetweenCloseRetries + "' before retry close");

    
   
243
            timeBetweenCloseRetries.wait(timeBetweenCloseRetries);

    
   
244
          }

    
   
245

   

    
   
246
          try {

    
   
247
            outputStream.close();

    
   
248
          } catch (IOException e) {

    
   
249
            logger.error("Unable to close HDFS file: '" + destPath + "'");

    
   
250
          }

    
   
251
        }

    
   
252
        if (retryCounter == numberOfCloseRetries.intValue()) {

    
   
253
          throw new IOException("Failed to close '" + destPath + "' is " +

    
   
254
            numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds");

    
   
255
        }

    
   
256
      } catch (IOException e) {

    
   
257
        throw e;

    
   
258
      }

    
   
259
      catch (Exception e) {

    
   
260
        if (ableToCloseFileOnFirstTry == false) {

    
   
261
          throw new IOException("Failed to close '" + destPath + "'");

    
   
262
        }

    
   
263
      }

    
   
264
    }

    
   
265
  }

    
   
266

   
215
}
267
}
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
Revision 2c2be6a New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
Revision b8214be New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
Revision 0383744 New Change
 
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
Revision ffbdde0 New Change
 
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java: Loading...
  2. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java: Loading...
  3. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java: Loading...
  4. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java: Loading...
  5. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java: Loading...