Review Board 1.7.22


HDFS Sink should check if file is closed and retry if it is not.

Review Request #11583 - Created May 31, 2013 and updated

Ted Malaska
trunk
2007
Reviewers
Flume
flume-git
We can use the new API added in HDFS-4525. We will need to use reflection though, so we can run against a version of HDFS which does not have this API.


 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
Revision bc3b383 New Change
[20] 18 lines
[+20]
19

    
   
19

   
20
import com.google.common.base.Preconditions;
20
import com.google.common.base.Preconditions;
21
import org.apache.flume.Context;
21
import org.apache.flume.Context;
22
import org.apache.flume.annotations.InterfaceAudience;
22
import org.apache.flume.annotations.InterfaceAudience;
23
import org.apache.flume.annotations.InterfaceStability;
23
import org.apache.flume.annotations.InterfaceStability;

    
   
24
import org.apache.hadoop.conf.Configuration;
24
import org.apache.hadoop.fs.FSDataOutputStream;
25
import org.apache.hadoop.fs.FSDataOutputStream;
25
import org.apache.hadoop.fs.FileSystem;
26
import org.apache.hadoop.fs.FileSystem;
26
import org.apache.hadoop.fs.Path;
27
import org.apache.hadoop.fs.Path;
27
import org.slf4j.Logger;
28
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29
import org.slf4j.LoggerFactory;
29

    
   
30

   

    
   
31
import java.io.IOException;
30
import java.io.OutputStream;
32
import java.io.OutputStream;
31
import java.lang.reflect.InvocationTargetException;
33
import java.lang.reflect.InvocationTargetException;
32
import java.lang.reflect.Method;
34
import java.lang.reflect.Method;
33

    
   
35

   
34
@InterfaceAudience.Private
36
@InterfaceAudience.Private
[+20] [20] 7 lines
[+20]
42
  private FileSystem fs;
44
  private FileSystem fs;
43
  private Path destPath;
45
  private Path destPath;
44
  private Method refGetNumCurrentReplicas = null;
46
  private Method refGetNumCurrentReplicas = null;
45
  private Method refGetDefaultReplication = null;
47
  private Method refGetDefaultReplication = null;
46
  private Integer configuredMinReplicas = null;
48
  private Integer configuredMinReplicas = null;

    
   
49
  private Integer numberOfCloseRetries = null;

    
   
50
  long timeBetweenCloseRetries = 30000;
47

    
   
51

   
48
  final static Object [] NO_ARGS = new Object []{};
52
  final static Object [] NO_ARGS = new Object []{};
49

    
   
53

   
50
  @Override
54
  @Override
51
  public void configure(Context context) {
55
  public void configure(Context context) {
52
    configuredMinReplicas = context.getInteger("hdfs.minBlockReplicas");
56
    configuredMinReplicas = context.getInteger("hdfs.minBlockReplicas");
53
    if (configuredMinReplicas != null) {
57
    if (configuredMinReplicas != null) {
54
      Preconditions.checkArgument(configuredMinReplicas >= 0,
58
      Preconditions.checkArgument(configuredMinReplicas >= 0,
55
          "hdfs.minBlockReplicas must be greater than or equal to 0");
59
          "hdfs.minBlockReplicas must be greater than or equal to 0");
56
    }
60
    }

    
   
61
    numberOfCloseRetries = context.getInteger("hdfs.close.retries", 3);
57
  }
62
  }
58

    
   
63

   
59
  /**
64
  /**
60
   * Contract for subclasses: Call registerCurrentStream() on open,
65
   * Contract for subclasses: Call registerCurrentStream() on open,
61
   * unregisterCurrentStream() on close, and the base class takes care of the
66
   * unregisterCurrentStream() on close, and the base class takes care of the
[+20] [20] 33 lines
[+20] [+] protected void registerCurrentStream(FSDataOutputStream outputStream,
95
    this.outputStream = outputStream;
100
    this.outputStream = outputStream;
96
    this.fs = fs;
101
    this.fs = fs;
97
    this.destPath = destPath;
102
    this.destPath = destPath;
98
    this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
103
    this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
99
    this.refGetDefaultReplication = reflectGetDefaultReplication(fs);
104
    this.refGetDefaultReplication = reflectGetDefaultReplication(fs);

    
   
105

   

    
   
106
    try {

    
   
107
      Configuration conf = fs.getConf();

    
   
108
      if (conf != null) {

    
   
109
        timeBetweenCloseRetries = Long.parseLong(conf.get("hdfs.callTimeout", "30000"));

    
   
110
      }

    
   
111
    } catch (NumberFormatException e) {

    
   
112
      logger.warn("hdfs.callTimeout can not be parsed to a long: " + fs.getConf().get("hdfs.callTimeout"));

    
   
113
    }

    
   
114

   

    
   
115
    if (numberOfCloseRetries > 0) {

    
   
116
      timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000);

    
   
117
    }

    
   
118

   
100
  }
119
  }
101

    
   
120

   
102
  protected void unregisterCurrentStream() {
121
  protected void unregisterCurrentStream() {
103
    this.outputStream = null;
122
    this.outputStream = null;
104
    this.fs = null;
123
    this.fs = null;
[+20] [20] 105 lines
[+20] [+] private Method reflectGetDefaultReplication(FileSystem fileSystem) {
210
          "HADOOP-8014");
229
          "HADOOP-8014");
211
    }
230
    }
212
    return m;
231
    return m;
213
  }
232
  }
214

    
   
233

   

    
   
234
  /**

    
   
235
   * This will

    
   
236
   * @param outputStream

    
   
237
   * @throws IOException

    
   
238
   */

    
   
239
  protected void closeHDFSOutputStream(OutputStream outputStream)

    
   
240
      throws IOException {

    
   
241
    try {

    
   
242
      outputStream.close();

    
   
243

   

    
   
244
      if (numberOfCloseRetries > 0) {

    
   
245
        try {

    
   
246
          Method isFileClosedMethod = getIsFileClosedMethod();

    
   
247
          int retryCounter = 0;

    
   
248
          if (isFileClosedMethod != null) {

    
   
249

   

    
   
250

   

    
   
251

   

    
   
252

   

    
   
253

   

    
   
254
            while (retryCounter < numberOfCloseRetries.intValue() &&

    
   
255
                Boolean.FALSE.equals(isFileClosedMethod.invoke(fs, destPath))) {

    
   
256
              retryCounter++;

    
   
257

   

    
   
258

   

    
   
259
              logger.debug("Waiting: '" + timeBetweenCloseRetries + "' before retry close");

    
   
260
              Thread.sleep(timeBetweenCloseRetries);

    
   
261

   

    
   
262
              try {

    
   
263
                outputStream.close();

    
   
264
              } catch (IOException e) {

    
   
265
                logger.error("Unable to close HDFS file: '" + destPath + "'");

    
   
266
              }

    
   
267
            }

    
   
268
            if (retryCounter == numberOfCloseRetries.intValue()) {

    
   
269
              logger.warn("Failed to close '" + destPath + "' is " +

    
   
270
                numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds");

    
   
271
            }

    
   
272
          }

    
   
273
        } catch (Exception e) {

    
   
274
          logger.error("Failed to close '" + destPath + "' is " +

    
   
275
              numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds", e);

    
   
276
        }

    
   
277
      }

    
   
278
    } catch (IOException e) {

    
   
279
      logger.error("Unable to close HDFS file: '" + destPath + "'");

    
   
280
    }

    
   
281
  }

    
   
282

   

    
   
283
  private Method getIsFileClosedMethod() {

    
   
284
    try {

    
   
285
      return fs.getClass().getMethod("isFileClosed", Path.class);

    
   
286
    } catch (Exception e) {

    
   
287
      return null;

    
   
288
    }

    
   
289
  }

    
   
290

   
215
}
291
}
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
Revision 2c2be6a New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
Revision b8214be New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
Revision 0383744 New Change
 
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
New File
 
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
New File
 
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
Revision ffbdde0 New Change
 
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java: Loading...
  2. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java: Loading...
  3. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java: Loading...
  4. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java: Loading...
  5. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java: Loading...
  6. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java: Loading...
  7. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java: Loading...