Review Board 1.7.22


PIG-3223 AvroStorage does not handle comma separated input paths

Review Request #10351 - Created April 8, 2013 and updated

Johnny Zhang
trunk
PIG-3223
Reviewers
pig
pig-git
we want to support comma separated input paths in AvroStorage, for example
"test_dir1/test_glob1.avro,test_dir1/test_glob2.avro,test_dir1/test_glob3.avro"
"test_dir1/*, test_dir2/test_glob4.avro, test_dir2/test_glob5.avro"
added two more test cases in TestAvroStorage.java and they all pass

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 4. See what's changed.

1 2 3 4
1 2 3 4

  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java: Loading...
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
Revision 0ac0225 New Change
[20] 25 lines
[+20]
26
import java.util.HashSet;
26
import java.util.HashSet;
27
import java.util.LinkedHashMap;
27
import java.util.LinkedHashMap;
28
import java.util.List;
28
import java.util.List;
29
import java.util.Map;
29
import java.util.Map;
30
import java.util.Map.Entry;
30
import java.util.Map.Entry;

    
   
31
import java.util.regex.Matcher;

    
   
32
import java.util.regex.Pattern;
31
import java.util.Set;
33
import java.util.Set;
32
import java.net.URI;
34
import java.net.URI;
33
import org.apache.avro.Schema;
35
import org.apache.avro.Schema;
34
import org.apache.avro.Schema.Field;
36
import org.apache.avro.Schema.Field;
35
import org.apache.hadoop.conf.Configuration;
37
import org.apache.hadoop.conf.Configuration;
[+20] [20] 74 lines
[+20] [+] public static boolean addInputPaths(String pathString, Job job) throws IOException {
110
      return false;
112
      return false;
111
    }
113
    }
112

    
   
114

   
113
    /**
115
    /**
114
     * Adds all non-hidden directories and subdirectories to set param
116
     * Adds all non-hidden directories and subdirectories to set param
115
     *
117
     * it supports comma-separated input paths and glob style path

    
   
118
     * first force check glob

    
   
119
     * if glob style path, go the original route

    
   
120
     * if not, split the paths by comma and pass them to the getAllSubDirsInternal method individually.
116
     * @throws IOException
121
     * @throws IOException
117
     */
122
     */
118
    public static boolean getAllSubDirs(Path path, Configuration conf, Set<Path> paths) throws IOException {
123
    public static boolean getAllSubDirs(Path path, Configuration conf, Set<Path> paths) throws IOException {

    
   
124
        if(isGlob(path)) {

    
   
125
            return getAllSubDirsInternal(path, conf, paths);

    
   
126
        }

    
   
127
        else {

    
   
128
            String[] separatedPathStrings = path.toString().split(",");

    
   
129
            for (String separatedPath : separatedPathStrings) {

    
   
130
                getAllSubDirsInternal(new Path(separatedPath), conf, paths);

    
   
131
            }

    
   
132
            return true;

    
   
133
        }

    
   
134
    }

    
   
135

   

    
   
136
    public static boolean getAllSubDirsInternal(Path path, Configuration conf, Set<Path> paths) throws IOException {
119
        FileSystem fs = FileSystem.get(path.toUri(), conf);
137
        FileSystem fs = FileSystem.get(path.toUri(), conf);
120
        FileStatus[] matchedFiles = fs.globStatus(path, PATH_FILTER);
138
        FileStatus[] matchedFiles = fs.globStatus(path, PATH_FILTER);
121
        if (matchedFiles == null || matchedFiles.length == 0) {
139
        if (matchedFiles == null || matchedFiles.length == 0) {
122
            return false;
140
            return false;
123
        }
141
        }

    
   
142

   
124
        for (FileStatus file : matchedFiles) {
143
        for (FileStatus file : matchedFiles) {
125
            if (file.isDir()) {
144
            if (file.isDir()) {
126
                for (FileStatus sub : fs.listStatus(file.getPath())) {
145
                for (FileStatus sub : fs.listStatus(file.getPath())) {
127
                    getAllSubDirs(sub.getPath(), conf, paths);
146
                    getAllSubDirsInternal(sub.getPath(), conf, paths);
128
                }
147
                }
129
            } else {
148
            } else {
130
                AvroStorageLog.details("Add input file:" + file);
149
                AvroStorageLog.details("Add input file:" + file);
131
                paths.add(file.getPath());
150
                paths.add(file.getPath());
132
            }
151
            }
133
        }
152
        }
134
        return true;
153
        return true;
135
    }
154
    }
136

    
   
155

   

    
   
156
    /**

    
   
157
     * check if the avrostorage input path is a "glob" style input

    
   
158
     */

    
   
159
    public static boolean isGlob(Path path) {

    
   
160
        Pattern p = Pattern.compile("[\\S]*\\{[\\S]+\\}[\\S]*");

    
   
161
        Matcher m = p.matcher(path.toString());

    
   
162
        return m.matches();

    
   
163
    }

    
   
164

   

    
   
165

   
137
    /** check whether there is NO directory in the input file (status) list*/
166
    /** check whether there is NO directory in the input file (status) list*/
138
    public static boolean noDir(FileStatus [] ss) {
167
    public static boolean noDir(FileStatus [] ss) {
139
        for (FileStatus s : ss) {
168
        for (FileStatus s : ss) {
140
            if (s.isDir())
169
            if (s.isDir())
141
                return false;
170
                return false;
[+20] [20] 523 lines
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Revision bd7a6d2 New Change
 
  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java: Loading...