Review Board 1.7.22


PIG-3223 AvroStorage does not handle comma separated input paths

Review Request #10351 - Created April 8, 2013 and updated

Johnny Zhang
trunk
PIG-3223
Reviewers
pig
pig-git
we want to support comma separated input paths in AvroStorage, for example
"test_dir1/test_glob1.avro,test_dir1/test_glob2.avro,test_dir1/test_glob3.avro"
"test_dir1/*, test_dir2/test_glob4.avro, test_dir2/test_glob5.avro"
added two more test cases in TestAvroStorage.java and they all pass
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
Revision 0ac0225 New Change
[20] 36 lines
[+20]
37
import org.apache.hadoop.fs.FileSystem;
37
import org.apache.hadoop.fs.FileSystem;
38
import org.apache.hadoop.fs.Path;
38
import org.apache.hadoop.fs.Path;
39
import org.apache.hadoop.fs.PathFilter;
39
import org.apache.hadoop.fs.PathFilter;
40
import org.apache.hadoop.mapreduce.Job;
40
import org.apache.hadoop.mapreduce.Job;
41
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
41
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    
   
42
import org.apache.pig.LoadFunc;
42
import org.apache.pig.ResourceSchema;
43
import org.apache.pig.ResourceSchema;
43
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
44
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
44
import org.apache.pig.data.DataType;
45
import org.apache.pig.data.DataType;
45
import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
46
import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
46

    
   
47

   
[+20] [20] 63 lines
[+20] [+] public static boolean addInputPaths(String pathString, Job job) throws IOException {
110
      return false;
111
      return false;
111
    }
112
    }
112

    
   
113

   
113
    /**
114
    /**
114
     * Adds all non-hidden directories and subdirectories to set param
115
     * Adds all non-hidden directories and subdirectories to set param

    
   
116
     * it supports comma-separated input paths and glob style path
115
     *
117
     *
116
     * @throws IOException
118
     * @throws IOException
117
     */
119
     */
118
    public static boolean getAllSubDirs(Path path, Configuration conf, Set<Path> paths) throws IOException {
120
    public static boolean getAllSubDirs(Path path, Configuration conf,
119
        FileSystem fs = FileSystem.get(path.toUri(), conf);
121
            Set<Path> paths) throws IOException {
120
        FileStatus[] matchedFiles = fs.globStatus(path, PATH_FILTER);
122
        String[] pathStrs = LoadFunc.getPathStrings(path.toString());

    
   
123
        for (String pathStr : pathStrs) {

    
   
124
            FileSystem fs = FileSystem.get(new Path(pathStr).toUri(), conf);

    
   
125
            FileStatus[] matchedFiles = fs.globStatus(new Path(pathStr), PATH_FILTER);
121
        if (matchedFiles == null || matchedFiles.length == 0) {
126
            if (matchedFiles == null || matchedFiles.length == 0) {
122
            return false;
127
                return false;
123
        }
128
            }
124
        for (FileStatus file : matchedFiles) {
129
            for (FileStatus file : matchedFiles) {

    
   
130
                getAllSubDirsInternal(file, conf, paths, fs);

    
   
131
            }

    
   
132
        }

    
   
133
        return true;

    
   
134
    }

    
   
135

   

    
   
136
    private static void getAllSubDirsInternal(FileStatus file, Configuration conf,

    
   
137
            Set<Path> paths, FileSystem fs) throws IOException {
125
            if (file.isDir()) {
138
        if (file.isDir()) {
126
                for (FileStatus sub : fs.listStatus(file.getPath())) {
139
            for (FileStatus sub : fs.listStatus(file.getPath())) {
127
                    getAllSubDirs(sub.getPath(), conf, paths);
140
                getAllSubDirsInternal(sub, conf, paths, fs);
128
                }
141
            }
129
            } else {
142
        } else {
130
                AvroStorageLog.details("Add input file:" + file);
143
            AvroStorageLog.details("Add input file:" + file);
131
                paths.add(file.getPath());
144
            paths.add(file.getPath());
132
            }
145
        }
133
        }
146
    }
134
        return true;

   
135
    }

   
136

    
   
147

   
137
    /** check whether there is NO directory in the input file (status) list*/
148
    /** check whether there is NO directory in the input file (status) list*/
138
    public static boolean noDir(FileStatus [] ss) {
149
    public static boolean noDir(FileStatus [] ss) {
139
        for (FileStatus s : ss) {
150
        for (FileStatus s : ss) {
140
            if (s.isDir())
151
            if (s.isDir())
[+20] [20] 524 lines
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Revision bd7a6d2 New Change
 
  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java: Loading...