Review Board 1.7.22


Diff of PIG-3460, a large refactoring of the Pig illustrate code.

Review Request #14146 - Created Sept. 15, 2013 and updated

Jeremy Karn
Reviewers
pig
pig-git
Diff of PIG-3460, a large refactoring of the Pig illustrate code.

 

Diff revision 1 (Latest)

  1. src/org/apache/pig/PigServer.java: Loading...
  2. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java: Loading...
  3. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java: Loading...
  4. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java: Loading...
  5. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java: Loading...
  6. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java: Loading...
  7. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java: Loading...
  8. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java: Loading...
  9. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java: Loading...
  10. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java: Loading...
  11. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java: Loading...
  12. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java: Loading...
  13. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java: Loading...
  14. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java: Loading...
  15. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java: Loading...
  16. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java: Loading...
  17. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java: Loading...
  18. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java: Loading...
  19. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java: Loading...
  20. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java: Loading...
This diff has been split across 3 pages: 1 2 3 >
src/org/apache/pig/PigServer.java
Revision 2306ff0 New Change
[20] 41 lines
[+20]
42
import java.util.concurrent.atomic.AtomicInteger;
42
import java.util.concurrent.atomic.AtomicInteger;
43

    
   
43

   
44
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.Log;
45
import org.apache.commons.logging.LogFactory;
45
import org.apache.commons.logging.LogFactory;
46
import org.apache.hadoop.conf.Configuration;
46
import org.apache.hadoop.conf.Configuration;

    
   
47
import org.apache.hadoop.fs.s3.S3Exception;
47
import org.apache.log4j.Level;
48
import org.apache.log4j.Level;
48
import org.apache.log4j.Logger;
49
import org.apache.log4j.Logger;
49
import org.apache.pig.backend.datastorage.ContainerDescriptor;
50
import org.apache.pig.backend.datastorage.ContainerDescriptor;
50
import org.apache.pig.backend.datastorage.DataStorage;
51
import org.apache.pig.backend.datastorage.DataStorage;
51
import org.apache.pig.backend.datastorage.ElementDescriptor;
52
import org.apache.pig.backend.datastorage.ElementDescriptor;
52
import org.apache.pig.backend.executionengine.ExecException;
53
import org.apache.pig.backend.executionengine.ExecException;
53
import org.apache.pig.backend.executionengine.ExecJob;
54
import org.apache.pig.backend.executionengine.ExecJob;
54
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
55
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
55
import org.apache.pig.backend.hadoop.executionengine.HJob;
56
import org.apache.pig.backend.hadoop.executionengine.HJob;
56
import org.apache.pig.builtin.PigStorage;
57
import org.apache.pig.builtin.PigStorage;
57
import org.apache.pig.classification.InterfaceAudience;
58
import org.apache.pig.classification.InterfaceAudience;
58
import org.apache.pig.classification.InterfaceStability;
59
import org.apache.pig.classification.InterfaceStability;
59
import org.apache.pig.data.DataBag;

   
60
import org.apache.pig.data.Tuple;
60
import org.apache.pig.data.Tuple;
61
import org.apache.pig.impl.PigContext;
61
import org.apache.pig.impl.PigContext;
62
import org.apache.pig.impl.io.FileLocalizer;
62
import org.apache.pig.impl.io.FileLocalizer;
63
import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
63
import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
64
import org.apache.pig.impl.logicalLayer.FrontendException;
64
import org.apache.pig.impl.logicalLayer.FrontendException;
[+20] [20] 27 lines
[+20]
92
import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
92
import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
93
import org.apache.pig.newplan.logical.visitor.UnionOnSchemaSetter;
93
import org.apache.pig.newplan.logical.visitor.UnionOnSchemaSetter;
94
import org.apache.pig.parser.QueryParserDriver;
94
import org.apache.pig.parser.QueryParserDriver;
95
import org.apache.pig.parser.QueryParserUtils;
95
import org.apache.pig.parser.QueryParserUtils;
96
import org.apache.pig.pen.ExampleGenerator;
96
import org.apache.pig.pen.ExampleGenerator;

    
   
97
import org.apache.pig.pen.util.IllustrateResult;
97
import org.apache.pig.scripting.ScriptEngine;
98
import org.apache.pig.scripting.ScriptEngine;
98
import org.apache.pig.tools.grunt.GruntParser;
99
import org.apache.pig.tools.grunt.GruntParser;
99
import org.apache.pig.tools.pigstats.JobStats;
100
import org.apache.pig.tools.pigstats.JobStats;
100
import org.apache.pig.tools.pigstats.OutputStats;
101
import org.apache.pig.tools.pigstats.OutputStats;
101
import org.apache.pig.tools.pigstats.PigStats;
102
import org.apache.pig.tools.pigstats.PigStats;
102
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
103
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
103
import org.apache.pig.tools.pigstats.ScriptState;
104
import org.apache.pig.tools.pigstats.ScriptState;

    
   
105
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
104

    
   
106

   
105
/**
107
/**
106
 *
108
 *
107
 * A class for Java programs to connect to Pig. Typically a program will create a PigServer
109
 * A class for Java programs to connect to Pig. Typically a program will create a PigServer
108
 * instance. The programmer then registers queries using registerQuery() and
110
 * instance. The programmer then registers queries using registerQuery() and
[+20] [20] 1107 lines
[+20] [+] public void shutdown() {
1216
     */
1218
     */
1217
    public Set<String> getAliasKeySet() {
1219
    public Set<String> getAliasKeySet() {
1218
        return currDAG.getAliasOp().keySet();
1220
        return currDAG.getAliasOp().keySet();
1219
    }
1221
    }
1220

    
   
1222

   
1221
    public Map<Operator, DataBag> getExamples(String alias) throws IOException {
1223
    public IllustrateResult getExamples(String alias) throws IOException {

    
   
1224
        return getExamples(alias, false);

    
   
1225
    }

    
   
1226
    

    
   
1227
    public IllustrateResult getExamples(String alias, boolean skipPruning) throws IOException {

    
   
1228
        return getExamples(alias, skipPruning, System.out, null);

    
   
1229
    }

    
   
1230
    

    
   
1231
    public IllustrateResult getExamples(String alias, Boolean skipPruning, PrintStream outStream, String format) 

    
   
1232
        throws IOException {

    
   
1233
        MRScriptState.get().emitIllustrateBuildingPlanNotification();

    
   
1234
        log.info("Illustrate: Building Plan");
1222
        try {
1235
        try {
1223
            if (currDAG.isBatchOn() && alias != null) {
1236
            if (currDAG.isBatchOn() && alias != null) {
1224
                currDAG.parseQuery();
1237
                currDAG.parseQuery();
1225
                currDAG.buildPlan( null );
1238
                currDAG.buildPlan( null );
1226
                execute();

   
1227
            }
1239
            }
1228
            currDAG.parseQuery();
1240
            currDAG.parseQuery();
1229
            currDAG.buildPlan( alias );
1241
            currDAG.buildPlan( alias, true ); // true signals to allow non-store leaves if alias null
1230
            currDAG.compile();
1242
                                              // used for alias-less illustrate
1231
        } catch (IOException e) {
1243
            currDAG.compile();     
1232
            //Since the original script is parsed anyway, there should not be an
1244
            
1233
            //error in this parsing. The only reason there can be an error is when

   
1234
            //the files being loaded in load don't exist anymore.

   
1235
            e.printStackTrace();

   
1236
        }

   
1237

    
   

   
1238
        ExampleGenerator exgen = new ExampleGenerator( currDAG.lp, pigContext );
1245
            ExampleGenerator exgen = new ExampleGenerator( currDAG.lp, pigContext );
1239
        try {
1246
            IllustrateResult result = exgen.getExamples(skipPruning);
1240
            return exgen.getExamples();
1247
            
1241
        } catch (ExecException e) {
1248
            if (outStream != null) {
1242
            e.printStackTrace(System.out);
1249
                if (format != null && format.equals("json")) {
1243
            throw new IOException("ExecException" , e);
1250
                    outStream.print(result.toJson());

    
   
1251
                } else {

    
   
1252
                    outStream.print(result.toString());

    
   
1253
                }

    
   
1254
            }

    
   
1255
            

    
   
1256
            return result;

    
   
1257
        } catch (S3Exception e) {

    
   
1258
            throw new IOException(e);
1244
        } catch (Exception e) {
1259
        } catch (Exception e) {
1245
            e.printStackTrace(System.out);
1260
            throw new IOException(e);
1246
            throw new IOException("Exception ", e);

   
1247
        }
1261
        }
1248

    
   
1262

   
1249
    }
1263
    }
1250

    
   
1264

   
1251
    public void printHistory(boolean withNumbers) {
1265
    public void printHistory(boolean withNumbers) {
[+20] [20] 203 lines
[+20] [+] public LogicalPlan getPlan(String alias) throws IOException {
1455
         * Build a plan for the given alias. Extra branches and child branch under alias
1469
         * Build a plan for the given alias. Extra branches and child branch under alias
1456
         * will be ignored. Dependent branch (i.e. scalar) will be kept.
1470
         * will be ignored. Dependent branch (i.e. scalar) will be kept.
1457
         * @throws IOException
1471
         * @throws IOException
1458
         */
1472
         */
1459
        void buildPlan(String alias) throws IOException {
1473
        void buildPlan(String alias) throws IOException {
1460
            if( alias == null )
1474
            buildPlan(alias, false);

    
   
1475
        }

    
   
1476
        

    
   
1477
        /**

    
   
1478
         * If an alias is specified, build a plan for that alias and its ancestors.

    
   
1479
         * If alias is null and isIllustrate is false, build a plan from all STORE statements.

    
   
1480
         * If alias is null and isIllustrate is true, build a plan from all sink aliases,

    
   
1481
         *   except use the immediate ancestor of STORE statements instead of the STOREs themselves.

    
   
1482
         *

    
   
1483
         * @throws IOException 

    
   
1484
         * @param alias 

    
   
1485
         * @param isIllustrate

    
   
1486
         */

    
   
1487
        void buildPlan(String alias, boolean isIllustrate) throws IOException {

    
   
1488
            if( alias == null)
1461
                skipStores();
1489
                skipStores();
1462

    
   
1490

   
1463
            final Queue<Operator> queue = new LinkedList<Operator>();
1491
            final Queue<Operator> queue = new LinkedList<Operator>();
1464
            if( alias != null ) {
1492
            if( alias != null ) {
1465
                Operator op = getOperator( alias );
1493
                Operator op = getOperator( alias );
[+20] [20] 4 lines
[+20] public LogicalPlan getPlan(String alias) throws IOException {
1470
                queue.add( op );
1498
                queue.add( op );
1471
            } else {
1499
            } else {
1472
                List<Operator> sinks = lp.getSinks();
1500
                List<Operator> sinks = lp.getSinks();
1473
                if( sinks != null ) {
1501
                if( sinks != null ) {
1474
                    for( Operator sink : sinks ) {
1502
                    for( Operator sink : sinks ) {
1475
                        if( sink instanceof LOStore )
1503
                        if (isIllustrate) {
1476
                            queue.add( sink );
1504
                            if (sink instanceof LOStore) {

    
   
1505
                                // getPredecessors returns only immediate predecessors (one step up)

    
   
1506
                                for (Operator predecessor : lp.getPredecessors(sink)) {

    
   
1507
                                    queue.add(predecessor);

    
   
1508
                                }

    
   
1509
                            } else {

    
   
1510
                                queue.add(sink);

    
   
1511
                            }

    
   
1512
                        } else if (sink instanceof LOStore) {

    
   
1513
                            queue.add(sink);

    
   
1514
                        }
1477
                    }
1515
                    }
1478
                }
1516
                }
1479
            }
1517
            }
1480

    
   
1518

   
1481
            LogicalPlan plan = new LogicalPlan();
1519
            LogicalPlan plan = new LogicalPlan();
[+20] [20] 344 lines
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Revision 9341153 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
Revision 64f0ee1 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
Revision 933363d New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
Revision b3f51da New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
Revision c355d1d New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
Revision 2246b50 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
Revision cf16cca New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
Revision bb93273 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
Revision d6f4a4f New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
Revision eb9f62a New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
Revision 82f11ac New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
Revision ff5e644 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
Revision 426b79b New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
Revision bbf11c4 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
Revision 1ac3043 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
Revision d9767e1 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
Revision 86314d9 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
Revision c200715 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
Revision c390136 New Change
 
  1. src/org/apache/pig/PigServer.java: Loading...
  2. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java: Loading...
  3. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java: Loading...
  4. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java: Loading...
  5. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java: Loading...
  6. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java: Loading...
  7. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java: Loading...
  8. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java: Loading...
  9. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java: Loading...
  10. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java: Loading...
  11. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java: Loading...
  12. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java: Loading...
  13. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java: Loading...
  14. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java: Loading...
  15. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java: Loading...
  16. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java: Loading...
  17. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java: Loading...
  18. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java: Loading...
  19. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java: Loading...
  20. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java: Loading...
This diff has been split across 3 pages: 1 2 3 >