Review Board 1.7.22


Changes for PIG-3321

Review Request #11155 - Created May 14, 2013 and updated

Harvey Chong
Reviewers
pig
rohini
pig
Changes for https://issues.apache.org/jira/browse/PIG-3321

Overview:
AvroStorage.java - If 'schema' argument is passed to constructor, use it as the reader schema on load.  Moved getSchema() to AvroStorageUtils and made public+static, so it can be called from PigAvroRecordReader.
AvroStorageUtils.java - Moved getSchema() here.
PigAvroInputFormat.java - nothing functional here, just renamed 'schema' to 'readerSchema' for clarity.
PigAvroRecordReader.java - The constructor now determines the writer schema for its split, and passes both reader and writer schema to the PigAvroDatumReader constructor, which will allow the Avro code to resolve the two. 
PigAvroDatumReader.java - Changed readRecord() to add entries to the output Tuple in writer order rather than reader order.
TestAvroStorage.java - Added a new testcase for user specified schema in load.

 
http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
Revision 1482017 New Change
[20] 86 lines
[+20] [+] public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
87
    private boolean nullable = true;
87
    private boolean nullable = true;
88

    
   
88

   
89
    /* loadFunc parameters */
89
    /* loadFunc parameters */
90
    private PigAvroRecordReader reader = null;   /* avro record writer */
90
    private PigAvroRecordReader reader = null;   /* avro record writer */
91
    private Schema inputAvroSchema = null;    /* input avro schema */
91
    private Schema inputAvroSchema = null;    /* input avro schema */

    
   
92
    private Schema customAvroSchema = null;    /* avro schema specified in constructor args */
92

    
   
93

   
93
    /* if multiple avro record schemas are merged, this map associates each input
94
    /* if multiple avro record schemas are merged, this map associates each input
94
     * record with a remapping of its fields relative to the merged schema. please
95
     * record with a remapping of its fields relative to the merged schema. please
95
     * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
96
     * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
96
     */
97
     */
[+20] [20] 72 lines
[+20] [+] public void setLocation(String location, Job job) throws IOException {
169
     * @param conf  configuration
170
     * @param conf  configuration
170
     * @return avro schema
171
     * @return avro schema
171
     * @throws IOException
172
     * @throws IOException
172
     */
173
     */
173
    protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
174
    protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {

    
   
175
        if(customAvroSchema != null) {

    
   
176
            inputAvroSchema = customAvroSchema;

    
   
177
        }

    
   
178
        else {
174
        inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
179
            inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
175
                                             : getAvroSchema(paths, conf);
180
                                                 : getAvroSchema(paths, conf);
176
    }
181
        }

    
   
182
    }
177

    
   
183

   
178
    /**
184
    /**
179
     * Get avro schema of first input file that matches the location pattern.
185
     * Get avro schema of first input file that matches the location pattern.
180
     *
186
     *
181
     * @param paths  set of input files
187
     * @param paths  set of input files
[+20] [20] 29 lines
[+20] [+] protected Schema getAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
211
        if (!fs.exists(path) || !AvroStorageUtils.PATH_FILTER.accept(path))
217
        if (!fs.exists(path) || !AvroStorageUtils.PATH_FILTER.accept(path))
212
            return null;
218
            return null;
213

    
   
219

   
214
        /* if path is first level directory or is a file */
220
        /* if path is first level directory or is a file */
215
        if (!fs.isDirectory(path)) {
221
        if (!fs.isDirectory(path)) {
216
            return getSchema(path, fs);
222
            return AvroStorageUtils.getSchema(path, fs);
217
        }
223
        }
218

    
   
224

   
219
        FileStatus[] ss = fs.listStatus(path, AvroStorageUtils.PATH_FILTER);
225
        FileStatus[] ss = fs.listStatus(path, AvroStorageUtils.PATH_FILTER);
220
        Schema schema = null;
226
        Schema schema = null;
221
        if (ss.length > 0) {
227
        if (ss.length > 0) {
222
            if (AvroStorageUtils.noDir(ss))
228
            if (AvroStorageUtils.noDir(ss))
223
                return getSchema(path, fs);
229
                return AvroStorageUtils.getSchema(path, fs);
224

    
   
230

   
225
            /*otherwise, check whether schemas of underlying directories are the same */
231
            /*otherwise, check whether schemas of underlying directories are the same */
226
            for (FileStatus s : ss) {
232
            for (FileStatus s : ss) {
227
                Schema newSchema = getAvroSchema(s.getPath(), fs);
233
                Schema newSchema = getAvroSchema(s.getPath(), fs);
228
                if (schema == null) {
234
                if (schema == null) {
[+20] [20] 27 lines
[+20] protected Schema getAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
256
    protected Schema getMergedSchema(Set<Path> paths, Configuration conf) throws IOException {
262
    protected Schema getMergedSchema(Set<Path> paths, Configuration conf) throws IOException {
257
        Schema result = null;
263
        Schema result = null;
258
        Map<Path, Schema> mergedFiles = new HashMap<Path, Schema>();
264
        Map<Path, Schema> mergedFiles = new HashMap<Path, Schema>();
259
        for (Path path : paths) {
265
        for (Path path : paths) {
260
            FileSystem fs = FileSystem.get(path.toUri(), conf);
266
            FileSystem fs = FileSystem.get(path.toUri(), conf);
261
            Schema schema = getSchema(path, fs);
267
            Schema schema = AvroStorageUtils.getSchema(path, fs);
262
            result = AvroStorageUtils.mergeSchema(result, schema);
268
            result = AvroStorageUtils.mergeSchema(result, schema);
263
            mergedFiles.put(path, schema);
269
            mergedFiles.put(path, schema);
264
        }
270
        }
265
        // schemaToMergedSchemaMap is only needed when merging multiple records.
271
        // schemaToMergedSchemaMap is only needed when merging multiple records.
266
        if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
272
        if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
267
            schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
273
            schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
268
        }
274
        }
269
        return result;
275
        return result;
270
    }
276
    }
271

    
   
277

   
272
    /**
278
    /**
273
     * This method is called by {@link #getAvroSchema}. The default implementation

   
274
     * returns the schema of an avro file; or the schema of the last file in a first-level

   
275
     * directory (it does not contain sub-directories).

   
276
     *

   
277
     * @param path  path of a file or first level directory

   
278
     * @param fs  file system

   
279
     * @return avro schema

   
280
     * @throws IOException

   
281
     */

   
282
    protected Schema getSchema(Path path, FileSystem fs) throws IOException {

   
283
        /* get path of the last file */

   
284
        Path lastFile = AvroStorageUtils.getLast(path, fs);

   
285

    
   

   
286
        /* read in file and obtain schema */

   
287
        GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();

   
288
        InputStream hdfsInputStream = fs.open(lastFile);

   
289
        DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);

   
290
        Schema ret = avroDataStream.getSchema();

   
291
        avroDataStream.close();

   
292

    
   

   
293
        return ret;

   
294
    }

   
295

    
   

   
296
    /**

   
297
     * This method is called to return the schema of an avro schema file. This
279
     * This method is called to return the schema of an avro schema file. This
298
     * method is different than {@link #getSchema}, which returns the schema
280
     * method is different than {@link #getSchema}, which returns the schema
299
     * from a data file.
281
     * from a data file.
300
     *
282
     *
301
     * @param path  path of a file or first level directory
283
     * @param path  path of a file or first level directory
[+20] [20] 237 lines
[+20] [+] protected void init(Map<String, Object> inputs) throws IOException {
539
                outputAvroSchema = getAvroSchema(path, fs);
521
                outputAvroSchema = getAvroSchema(path, fs);
540
            } else if (name.equalsIgnoreCase("nullable")) {
522
            } else if (name.equalsIgnoreCase("nullable")) {
541
                nullable = (Boolean) value;
523
                nullable = (Boolean) value;
542
            } else if (name.equalsIgnoreCase("schema")) {
524
            } else if (name.equalsIgnoreCase("schema")) {
543
                outputAvroSchema = Schema.parse((String) value);
525
                outputAvroSchema = Schema.parse((String) value);

    
   
526
                customAvroSchema = outputAvroSchema;
544
            } else if (name.matches("field\\d+")) {
527
            } else if (name.matches("field\\d+")) {
545
                /*set schema of dth field */
528
                /*set schema of dth field */
546
                if (fields == null)
529
                if (fields == null)
547
                    fields = new ArrayList<Field>();
530
                    fields = new ArrayList<Field>();
548

    
   
531

   
[+20] [20] 179 lines
http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
Revision 1482017 New Change
 
http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
Revision 1482017 New Change
 
http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
Revision 1482017 New Change
 
http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
Revision 1482017 New Change
 
http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Revision 1482017 New Change
 
  1. http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java: Loading...
  2. http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java: Loading...
  3. http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java: Loading...
  4. http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java: Loading...
  5. http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java: Loading...
  6. http://svn.apache.org/repos/asf/pig/branches/branch-0.11/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java: Loading...