Review Board 1.7.22


Changes for PIG-3321

Review Request #11155 - Created May 14, 2013 and updated

Harvey Chong
Reviewers
pig
rohini
pig
Changes for https://issues.apache.org/jira/browse/PIG-3321

Overview:
AvroStorage.java - If 'schema' argument is passed to constructor, use it as the reader schema on load.  Moved getSchema() to AvroStorageUtils and made public+static, so it can be called from PigAvroRecordReader.
AvroStorageUtils.java - Moved getSchema() here.
PigAvroInputFormat.java - nothing functional here, just renamed 'schema' to 'readerSchema' for clarity.
PigAvroRecordReader.java - The constructor now determines the writer schema for its split, and passes both reader and writer schema to the PigAvroDatumReader constructor, which will allow the Avro code to resolve the two. 
PigAvroDatumReader.java - Changed readRecord() to add entries to the output Tuple in writer order rather than reader order.
TestAvroStorage.java - Added a new testcase for user specified schema in load.

 
http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
Revision 1482982 New Change
[20] 86 lines
[+20] [+] public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
87
    private boolean nullable = true;
87
    private boolean nullable = true;
88

    
   
88

   
89
    /* loadFunc parameters */
89
    /* loadFunc parameters */
90
    private PigAvroRecordReader reader = null;   /* avro record writer */
90
    private PigAvroRecordReader reader = null;   /* avro record writer */
91
    private Schema inputAvroSchema = null;    /* input avro schema */
91
    private Schema inputAvroSchema = null;    /* input avro schema */

    
   
92
    private Schema userSpecifiedAvroSchema = null;    /* avro schema specified in constructor args */
92

    
   
93

   
93
    /* if multiple avro record schemas are merged, this map associates each input
94
    /* if multiple avro record schemas are merged, this map associates each input
94
     * record with a remapping of its fields relative to the merged schema. please
95
     * record with a remapping of its fields relative to the merged schema. please
95
     * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
96
     * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
96
     */
97
     */
[+20] [20] 72 lines
[+20] [+] public void setLocation(String location, Job job) throws IOException {
169
     * @param conf  configuration
170
     * @param conf  configuration
170
     * @return avro schema
171
     * @return avro schema
171
     * @throws IOException
172
     * @throws IOException
172
     */
173
     */
173
    protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
174
    protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {

    
   
175
        if(userSpecifiedAvroSchema != null) {

    
   
176
            inputAvroSchema = userSpecifiedAvroSchema;

    
   
177
        }

    
   
178
        else {
174
        inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
179
            inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
175
                                             : getAvroSchema(paths, conf);
180
                                                 : getAvroSchema(paths, conf);
176
    }
181
        }

    
   
182
    }
177

    
   
183

   
178
    /**
184
    /**
179
     * Get avro schema of first input file that matches the location pattern.
185
     * Get avro schema of first input file that matches the location pattern.
180
     *
186
     *
181
     * @param paths  set of input files
187
     * @param paths  set of input files
[+20] [20] 96 lines
[+20] [+] protected Schema getMergedSchema(Set<Path> paths, Configuration conf) throws IOException {
278
     * @param fs  file system
284
     * @param fs  file system
279
     * @return avro schema
285
     * @return avro schema
280
     * @throws IOException
286
     * @throws IOException
281
     */
287
     */
282
    protected Schema getSchema(Path path, FileSystem fs) throws IOException {
288
    protected Schema getSchema(Path path, FileSystem fs) throws IOException {
283
        /* get path of the last file */
289
        return AvroStorageUtils.getSchema(path, fs);
284
        Path lastFile = AvroStorageUtils.getLast(path, fs);

   
285

    
   

   
286
        /* read in file and obtain schema */

   
287
        GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();

   
288
        InputStream hdfsInputStream = fs.open(lastFile);

   
289
        DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);

   
290
        Schema ret = avroDataStream.getSchema();

   
291
        avroDataStream.close();

   
292

    
   

   
293
        return ret;

   
294
    }
290
    }
295

    
   
291

   
296
    /**
292
    /**
297
     * This method is called to return the schema of an avro schema file. This
293
     * This method is called to return the schema of an avro schema file. This
298
     * method is different than {@link #getSchema}, which returns the schema
294
     * method is different than {@link #getSchema}, which returns the schema
[+20] [20] 237 lines
[+20] [+] protected void init(Map<String, Object> inputs) throws IOException {
536
                /* use schema in the specified path as output schema */
532
                /* use schema in the specified path as output schema */
537
                Path path = new Path( ((String) value).trim());
533
                Path path = new Path( ((String) value).trim());
538
                AvroStorageLog.details("data path=" + path.toUri().toString());
534
                AvroStorageLog.details("data path=" + path.toUri().toString());
539
                FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
535
                FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
540
                outputAvroSchema = getAvroSchema(path, fs);
536
                outputAvroSchema = getAvroSchema(path, fs);

    
   
537
                userSpecifiedAvroSchema = outputAvroSchema;
541
            } else if (name.equalsIgnoreCase("nullable")) {
538
            } else if (name.equalsIgnoreCase("nullable")) {
542
                nullable = (Boolean) value;
539
                nullable = (Boolean) value;
543
            } else if (name.equalsIgnoreCase("schema")) {
540
            } else if (name.equalsIgnoreCase("schema")) {
544
                outputAvroSchema = Schema.parse((String) value);
541
                outputAvroSchema = Schema.parse((String) value);

    
   
542
                userSpecifiedAvroSchema = outputAvroSchema;
545
            } else if (name.equalsIgnoreCase("schema_uri")) {
543
            } else if (name.equalsIgnoreCase("schema_uri")) {
546
                /* use the contents of the specified path as output schema */
544
                /* use the contents of the specified path as output schema */
547
                Path path = new Path( ((String) value).trim());
545
                Path path = new Path( ((String) value).trim());
548
                AvroStorageLog.details("schema_uri path=" + path.toUri().toString());
546
                AvroStorageLog.details("schema_uri path=" + path.toUri().toString());
549
                FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
547
                FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
550
                outputAvroSchema = getSchemaFromFile(path, fs);
548
                outputAvroSchema = getSchemaFromFile(path, fs);

    
   
549
                userSpecifiedAvroSchema = outputAvroSchema;
551
            } else if (name.matches("field\\d+")) {
550
            } else if (name.matches("field\\d+")) {
552
                /*set schema of dth field */
551
                /*set schema of dth field */
553
                if (fields == null)
552
                if (fields == null)
554
                    fields = new ArrayList<Field>();
553
                    fields = new ArrayList<Field>();
555

    
   
554

   
[+20] [20] 179 lines
http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
Revision 1482982 New Change
 
http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
Revision 1482982 New Change
 
http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
Revision 1482982 New Change
 
http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
Revision 1482982 New Change
 
http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Revision 1482982 New Change
 
  1. http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java: Loading...
  2. http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java: Loading...
  3. http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java: Loading...
  4. http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java: Loading...
  5. http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java: Loading...
  6. http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java: Loading...