Review Board 1.7.22


PIG-3538 Implement LIMIT in Tez

Review Request #14897 - Created Oct. 24, 2013 and updated

Alex Bain
tez
PIG-3538
Reviewers
pig
cheolsoo, daijy, mwagner, rohini
pig-git
Implement LIMIT in Tez by providing an implementation of visitLimit in TezCompiler.java.

UPDATED (Oct 24 4:37 PM):
1. I added a test to TestTezCompiler.java and a GLD file
2. I included Daniel's patch for a new e2e test
[abain@abain-ld pig]$ cat data/1.dat
1,orange
2,apple
3,strawberry

[abain@abain-ld pig]$ cat test3.pig
a = load './1.dat' using PigStorage(',') as (id:int, fruit:chararray);
b = LIMIT a 2;
STORE b INTO 'foo';

I ran with with "pig -x tez -f test3.pig" and got the following (correct results):

[abain@abain-ld pig]$ hadoop fs -ls /user/abain/foo
Found 2 items
-rw-r--r--   1 abain supergroup          0 2013-10-23 18:38 /user/abain/foo/_SUCCESS
-rw-r--r--   1 abain supergroup         17 2013-10-23 18:38 /user/abain/foo/part-r-00000

[abain@abain-ld pig]$ hadoop fs -cat /user/abain/foo/part-r-00000
1	orange
2	apple

UPDATED (Oct 24 4:37 PM):
1. ant -Dtestcase=TestTezCompiler test passes
2. I ran test-e2e-tez. The new test seems to pass (although something else failed).
src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
Revision 0c20214 New Change
[20] 15 lines
[+20]
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18
package org.apache.pig.backend.hadoop.executionengine.tez;
18
package org.apache.pig.backend.hadoop.executionengine.tez;
19

    
   
19

   
20
import java.io.IOException;
20
import java.io.IOException;

    
   
21
import java.util.ArrayList;
21
import java.util.Collections;
22
import java.util.Collections;
22
import java.util.Iterator;
23
import java.util.Iterator;
23
import java.util.List;
24
import java.util.List;
24
import java.util.Map;
25
import java.util.Map;
25
import java.util.Random;
26
import java.util.Random;
26
import java.util.Set;
27
import java.util.Set;
27

    
   
28

   
28
import org.apache.commons.logging.Log;
29
import org.apache.commons.logging.Log;
29
import org.apache.commons.logging.LogFactory;
30
import org.apache.commons.logging.LogFactory;
30
import org.apache.pig.PigException;
31
import org.apache.pig.PigException;

    
   
32
import org.apache.pig.backend.executionengine.ExecException;
31
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
33
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
32
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
34
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
33
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
35
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;

    
   
36
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
34
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
37
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
35
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
38
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
36
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
39
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
37
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
40
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
38
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
41
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
[+20] [20] 16 lines
[+20]
55
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
58
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
56
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
59
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
57
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
60
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
58
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
61
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
59
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
62
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;

    
   
63
import org.apache.pig.data.DataType;
60
import org.apache.pig.impl.PigContext;
64
import org.apache.pig.impl.PigContext;
61
import org.apache.pig.impl.io.FileLocalizer;
65
import org.apache.pig.impl.io.FileLocalizer;
62
import org.apache.pig.impl.io.FileSpec;
66
import org.apache.pig.impl.io.FileSpec;
63
import org.apache.pig.impl.plan.DepthFirstWalker;
67
import org.apache.pig.impl.plan.DepthFirstWalker;
64
import org.apache.pig.impl.plan.NodeIdGenerator;
68
import org.apache.pig.impl.plan.NodeIdGenerator;
[+20] [20] 225 lines
[+20] [+] private void compile(PhysicalOperator op) throws IOException, PlanException, VisitorException {
290
            curTezOp.requestedParallelism = op.getRequestedParallelism();
294
            curTezOp.requestedParallelism = op.getRequestedParallelism();
291
        }
295
        }
292
        compiledInputs = prevCompInp;
296
        compiledInputs = prevCompInp;
293
    }
297
    }
294

    
   
298

   
295
    private TezOperator getTezOp() {

   
296
        return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));

   
297
    }

   
298

    
   

   
299
    private POLoad getLoad() {
Moved to 709

   
300
        POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
Moved to 710

   
301
        ld.setPc(pigContext);
Moved to 711

   
302
        ld.setIsTmpLoad(true);
Moved to 712

   
303
        return ld;
Moved to 713

   
304
    }
Moved to 714

   
305

    
   

   
306
    /**
299
    /**
307
     * Starts a new TezOperator and connects it to the old one by load-store.
300
     * Starts a new TezOperator and connects it to the old one by load-store.
308
     * The assumption is that the store is already inserted into the old
301
     * The assumption is that the store is already inserted into the old
309
     * TezOperator.
302
     * TezOperator.
310
     * @param fSpec
303
     * @param fSpec
[+20] [20] 26 lines
[+20] [+] private void nonBlocking(PhysicalOperator op) throws PlanException, IOException {
337
        }
330
        }
338
        tezOp.plan.addAsLeaf(op);
331
        tezOp.plan.addAsLeaf(op);
339
        curTezOp = tezOp;
332
        curTezOp = tezOp;
340
    }
333
    }
341

    
   
334

   
342
    private void blocking(PhysicalOperator op) throws IOException, PlanException {
335
    private void blocking() throws IOException, PlanException {
343
        TezOperator newTezOp = getTezOp();
336
        TezOperator newTezOp = getTezOp();
344
        tezPlan.add(newTezOp);
337
        tezPlan.add(newTezOp);
345
        for (TezOperator tezOp : compiledInputs) {
338
        for (TezOperator tezOp : compiledInputs) {
346
            tezOp.setClosed(true);
339
            tezOp.setClosed(true);
347
            tezPlan.connect(tezOp, newTezOp);
340
            tezPlan.connect(tezOp, newTezOp);
[+20] [20] 156 lines
[+20] [+] public void visitFRJoin(POFRJoin op) throws VisitorException {
504
        throw new TezCompilerException(msg, errCode, PigException.BUG);
497
        throw new TezCompilerException(msg, errCode, PigException.BUG);
505
    }
498
    }
506

    
   
499

   
507
    @Override
500
    @Override
508
    public void visitLimit(POLimit op) throws VisitorException {
501
    public void visitLimit(POLimit op) throws VisitorException {

    
   
502
        try {

    
   
503
            if (op.getLimitPlan() != null) {

    
   
504
                processUDFs(op.getLimitPlan());

    
   
505
            }

    
   
506
            

    
   
507
            // As an optimization, we'll add a limit to the end of the last tezOp. Note that in

    
   
508
            // some cases, such as when we have ORDER BY followed by LIMIT, the LimitOptimizer has

    
   
509
            // already removed the POLimit from the physical plan.

    
   
510
            if (!pigContext.inIllustrator) {

    
   
511
                nonBlocking(op);

    
   
512
                phyToTezOpMap.put(op, curTezOp);

    
   
513
            }

    
   
514
            

    
   
515
            // Need to add POLocalRearrange to the end of the last tezOp before we shuffle.

    
   
516
            POLocalRearrange lr = getLocalRearrange();

    
   
517
            curTezOp.plan.addAsLeaf(lr);

    
   
518
            

    
   
519
            // Mark the start of a new TezOperator, connecting the inputs. Note the parallelism is

    
   
520
            // currently fixed to 1 for all TezOperators.

    
   
521
            // TODO Explicitly set the parallelism once this is supported by TezOperator.

    
   
522
            blocking();

    
   
523
            

    
   
524
            // Then add a POPackage and a POForEach to the start of the new tezOp.

    
   
525
            POPackage pkg = getPackage();

    
   
526
            POForEach forEach = getPlainForEach();

    
   
527
            curTezOp.plan.add(pkg);

    
   
528
            curTezOp.plan.addAsLeaf(forEach);

    
   
529
            

    
   
530
            if (!pigContext.inIllustrator) {

    
   
531
                POLimit limitCopy = new POLimit(new OperatorKey(scope, nig.getNextNodeId(scope)));

    
   
532
                limitCopy.setLimit(op.getLimit());

    
   
533
                limitCopy.setLimitPlan(op.getLimitPlan());

    
   
534
                curTezOp.plan.addAsLeaf(limitCopy);

    
   
535
            } else {

    
   
536
                curTezOp.plan.addAsLeaf(op);

    
   
537
            }

    
   
538
        } catch (Exception e) {
509
        int errCode = 2034;
539
            int errCode = 2034;
510
        String msg = "Cannot compile " + op.getClass().getSimpleName();
540
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
511
        throw new TezCompilerException(msg, errCode, PigException.BUG);
541
            throw new TezCompilerException(msg, errCode, PigException.BUG, e);

    
   
542
        }
512
    }
543
    }
513

    
   
544

   
514
    @Override
545
    @Override
515
    public void visitLoad(POLoad op) throws VisitorException {
546
    public void visitLoad(POLoad op) throws VisitorException {
516
        try {
547
        try {
[+20] [20] 25 lines
[+20] [+] public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
542
    }
573
    }
543

    
   
574

   
544
    @Override
575
    @Override
545
    public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
576
    public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
546
        try {
577
        try {
547
            blocking(op);
578
            blocking();
548
            curTezOp.customPartitioner = op.getCustomPartitioner();
579
            curTezOp.customPartitioner = op.getCustomPartitioner();
549
            phyToTezOpMap.put(op, curTezOp);
580
            phyToTezOpMap.put(op, curTezOp);
550
        } catch (Exception e) {
581
        } catch (Exception e) {
551
            int errCode = 2034;
582
            int errCode = 2034;
552
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
583
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
[+20] [20] 119 lines
[+20] [+] public void visitUnion(POUnion op) throws VisitorException {
672
            int errCode = 2034;
703
            int errCode = 2034;
673
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
704
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
674
            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
705
            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
675
        }
706
        }
676
    }
707
    }

    
   
708

   
Moved from 299

    
   
709
    private POLoad getLoad() {
Moved from 300

    
   
710
        POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
Moved from 301

    
   
711
        ld.setPc(pigContext);
Moved from 302

    
   
712
        ld.setIsTmpLoad(true);
Moved from 303

    
   
713
        return ld;
Moved from 304

    
   
714
    }

    
   
715

   

    
   
716
    private POLocalRearrange getLocalRearrange() throws PlanException {

    
   
717
        POProject projectStar = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));

    
   
718
        projectStar.setResultType(DataType.TUPLE);

    
   
719
        projectStar.setStar(true);

    
   
720
        

    
   
721
        PhysicalPlan addPlan = new PhysicalPlan();

    
   
722
        addPlan.add(projectStar);

    
   
723
        

    
   
724
        List<PhysicalPlan> addPlans = Lists.newArrayList();

    
   
725
        addPlans.add(addPlan);

    
   
726
        

    
   
727
        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope, nig.getNextNodeId(scope)));

    
   
728
        try {

    
   
729
            lr.setIndex(0);

    
   
730
        } catch (ExecException e) {

    
   
731
            int errCode = 2058;

    
   
732
            String msg = "Unable to set index on the newly created POLocalRearrange.";

    
   
733
            throw new PlanException(msg, errCode, PigException.BUG, e);

    
   
734
        }

    
   
735
        lr.setKeyType(DataType.TUPLE);

    
   
736
        lr.setPlans(addPlans);

    
   
737
        lr.setResultType(DataType.TUPLE);

    
   
738
        return lr;

    
   
739
    }

    
   
740

   

    
   
741
    private POPackage getPackage() {

    
   
742
        boolean[] inner = { false };

    
   
743
        POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));

    
   
744
        pkg.setInner(inner);

    
   
745
        pkg.setKeyType(DataType.TUPLE);

    
   
746
        pkg.setNumInps(1);

    
   
747
        return pkg;

    
   
748
    }

    
   
749

   

    
   
750
    // Get a simple POForEach: ForEach X generate flatten($1)

    
   
751
    private POForEach getPlainForEach() {

    
   
752
        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));

    
   
753
        project.setResultType(DataType.TUPLE);

    
   
754
        project.setStar(false);

    
   
755
        project.setColumn(1);

    
   
756
        project.setOverloaded(true);

    
   
757
        

    
   
758
        PhysicalPlan addPlan = new PhysicalPlan();

    
   
759
        addPlan.add(project);

    
   
760
        

    
   
761
        List<PhysicalPlan> addPlans = Lists.newArrayList();

    
   
762
        addPlans.add(addPlan);

    
   
763
        

    
   
764
        List<Boolean> flatten = Lists.newArrayList();

    
   
765
        flatten.add(true);

    
   
766
        

    
   
767
        POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, addPlans, flatten);

    
   
768
        forEach.setResultType(DataType.BAG);

    
   
769
        return forEach;

    
   
770
    }

    
   
771

   

    
   
772
    private TezOperator getTezOp() {

    
   
773
        return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));

    
   
774
    }
677
}
775
}
678

    
   
776

   
test/e2e/pig/tests/tez.conf
Revision 5edc093 New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
New File
 
test/org/apache/pig/tez/TestTezCompiler.java
Revision ef51876 New Change
 
test/org/apache/pig/tez/TestTezJobControlCompiler.java
Revision 0a23513 New Change
 
  1. src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java: Loading...
  2. test/e2e/pig/tests/tez.conf: Loading...
  3. test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld: Loading...
  4. test/org/apache/pig/tez/TestTezCompiler.java: Loading...
  5. test/org/apache/pig/tez/TestTezJobControlCompiler.java: Loading...