Review Board 1.7.22


FLUME-2027: Check for default replication fails on federated cluster in hdfs sink

Review Request #10834 - Created April 29, 2013 and updated

Mike Percy
FLUME-2027
Reviewers
Flume
flume-git
Federated HDFS has a new API which has been ported to Hadoop 1.1.0 and Hadoop 2.0.0-alpha from HADOOP-8014 which takes a Path argument to the FileSystem.getDefaultReplication() call. This patch simply moves the existing implementation over to that new API.
Unit tests pass.

Tested this by running the unit tests under both Hadoop 1.0.1 and Hadoop 1.1.2 (manually modified the pom files) and looking at the DEBUG output from the unit tests. The debug output indicated that HADOOP-8014 is used if it's available, otherwise the existing API is used. This should be sufficient for all cases.
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
Revision ff4f223 New Change
[20] 20 lines
[+20]
21
import org.apache.flume.Context;
21
import org.apache.flume.Context;
22
import org.apache.flume.annotations.InterfaceAudience;
22
import org.apache.flume.annotations.InterfaceAudience;
23
import org.apache.flume.annotations.InterfaceStability;
23
import org.apache.flume.annotations.InterfaceStability;
24
import org.apache.hadoop.fs.FSDataOutputStream;
24
import org.apache.hadoop.fs.FSDataOutputStream;
25
import org.apache.hadoop.fs.FileSystem;
25
import org.apache.hadoop.fs.FileSystem;

    
   
26
import org.apache.hadoop.fs.Path;
26
import org.slf4j.Logger;
27
import org.slf4j.Logger;
27
import org.slf4j.LoggerFactory;
28
import org.slf4j.LoggerFactory;
28

    
   
29

   
29
import java.io.OutputStream;
30
import java.io.OutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.lang.reflect.InvocationTargetException;
[+20] [20] 6 lines
[+20]
37
  private static final Logger logger =
38
  private static final Logger logger =
38
      LoggerFactory.getLogger(AbstractHDFSWriter.class);
39
      LoggerFactory.getLogger(AbstractHDFSWriter.class);
39

    
   
40

   
40
  private FSDataOutputStream outputStream;
41
  private FSDataOutputStream outputStream;
41
  private FileSystem fs;
42
  private FileSystem fs;

    
   
43
  private Path destPath;
42
  private Method refGetNumCurrentReplicas = null;
44
  private Method refGetNumCurrentReplicas = null;

    
   
45
  private Method refGetDefaultReplication = null;
43
  private Integer configuredMinReplicas = null;
46
  private Integer configuredMinReplicas = null;
44

    
   
47

   
45
  final static Object [] NO_ARGS = new Object []{};
48
  final static Object [] NO_ARGS = new Object []{};
46

    
   
49

   
47
  @Override
50
  @Override
[+20] [20] 34 lines
[+20] [+] public boolean isUnderReplicated() {
82
    }
85
    }
83
    return false;
86
    return false;
84
  }
87
  }
85

    
   
88

   
86
  protected void registerCurrentStream(FSDataOutputStream outputStream,
89
  protected void registerCurrentStream(FSDataOutputStream outputStream,
87
                                      FileSystem fs) {
90
                                      FileSystem fs, Path destPath) {
88
    Preconditions.checkNotNull(outputStream, "outputStream must not be null");
91
    Preconditions.checkNotNull(outputStream, "outputStream must not be null");
89
    Preconditions.checkNotNull(fs, "fs must not be null");
92
    Preconditions.checkNotNull(fs, "fs must not be null");

    
   
93
    Preconditions.checkNotNull(destPath, "destPath must not be null");
90

    
   
94

   
91
    this.outputStream = outputStream;
95
    this.outputStream = outputStream;
92
    this.fs = fs;
96
    this.fs = fs;

    
   
97
    this.destPath = destPath;
93
    this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
98
    this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);

    
   
99
    this.refGetDefaultReplication = reflectGetDefaultReplication(fs);
94
  }
100
  }
95

    
   
101

   
96
  protected void unregisterCurrentStream() {
102
  protected void unregisterCurrentStream() {
97
    this.outputStream = null;
103
    this.outputStream = null;
98
    this.fs = null;
104
    this.fs = null;

    
   
105
    this.destPath = null;
99
    this.refGetNumCurrentReplicas = null;
106
    this.refGetNumCurrentReplicas = null;

    
   
107
    this.refGetDefaultReplication = null;
100
  }
108
  }
101

    
   
109

   
102
  public int getFsDesiredReplication() {
110
  public int getFsDesiredReplication() {
103
    if (fs != null) {
111
    short replication = 0;
104
      return fs.getDefaultReplication();
112
    if (fs != null && destPath != null) {

    
   
113
      if (refGetDefaultReplication != null) {

    
   
114
        try {

    
   
115
          replication = (Short) refGetDefaultReplication.invoke(fs, destPath);

    
   
116
        } catch (IllegalAccessException e) {

    
   
117
          logger.warn("Unexpected error calling getDefaultReplication(Path)", e);

    
   
118
        } catch (InvocationTargetException e) {

    
   
119
          logger.warn("Unexpected error calling getDefaultReplication(Path)", e);

    
   
120
        }

    
   
121
      } else {

    
   
122
        // will not work on Federated HDFS (see HADOOP-8014)

    
   
123
        replication = fs.getDefaultReplication();
105
    }
124
      }
106
    return 0;

   
107
  }
125
    }

    
   
126
    return replication;

    
   
127
  }
108

    
   
128

   
109
  /**
129
  /**
110
   * This method gets the datanode replication count for the current open file.
130
   * This method gets the datanode replication count for the current open file.
111
   *
131
   *
112
   * If the pipeline isn't started yet or is empty, you will get the default
132
   * If the pipeline isn't started yet or is empty, you will get the default
[+20] [20] 48 lines
[+20] [+] private Method reflectGetNumCurrentReplicas(FSDataOutputStream os) {
161
      logger.debug("Using getNumCurrentReplicas--HDFS-826");
181
      logger.debug("Using getNumCurrentReplicas--HDFS-826");
162
    }
182
    }
163
    return m;
183
    return m;
164
  }
184
  }
165

    
   
185

   

    
   
186
  /**

    
   
187
   * Find the 'getDefaultReplication' method on the passed <code>fs</code>

    
   
188
   * FileSystem that takes a Path argument.

    
   
189
   * @return Method or null.

    
   
190
   */

    
   
191
  private Method reflectGetDefaultReplication(FileSystem fileSystem) {

    
   
192
    Method m = null;

    
   
193
    if (fileSystem != null) {

    
   
194
      Class<?> fsClass = fileSystem.getClass();

    
   
195
      try {

    
   
196
        m = fsClass.getMethod("getDefaultReplication",

    
   
197
            new Class<?>[] { Path.class });

    
   
198
      } catch (NoSuchMethodException e) {

    
   
199
        logger.debug("FileSystem implementation doesn't support"

    
   
200
            + " getDefaultReplication(Path); -- HADOOP-8014 not available; " +

    
   
201
            "className = " + fsClass.getName() + "; err = " + e);

    
   
202
      } catch (SecurityException e) {

    
   
203
        logger.debug("No access to getDefaultReplication(Path) on "

    
   
204
            + "FileSystem implementation -- HADOOP-8014 not available; " +

    
   
205
            "className = " + fsClass.getName() + "; err = " + e);

    
   
206
      }

    
   
207
    }

    
   
208
    if (m != null) {

    
   
209
      logger.debug("Using FileSystem.getDefaultReplication(Path) from " +

    
   
210
          "HADOOP-8014");

    
   
211
    }

    
   
212
    return m;

    
   
213
  }

    
   
214

   
166
}
215
}
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
Revision 0c618b5 New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
Revision c87fafe New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
Revision 1a401d6 New Change
 
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java: Loading...
  2. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java: Loading...
  3. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java: Loading...
  4. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java: Loading...