Review Board 1.7.22


PIG-3536 Implement DISTINCT for Pig-on-Tez

Review Request #15219 - Created Nov. 5, 2013 and updated

Alex Bain
tez
PIG-3536
Reviewers
pig
cheolsoo, daijy, mwagner, rohini
pig-git
Implement DISTINCT for Pig-on-Tez by providing a (very straightforward) implementation in TezCompiler.java.

For the moment, this does NOT use two optimizations done in the MRCompiler. We will create a separate JIRA for these optimizations:
1. A distinct combiner
2. A combiner optimizer that replaces certain uses of DISTINCT with an algebraic udf

[Little code note: I changed the name of getPlainForEach to getForEachPlain. That way we can have getForEachHelper1, getForEachHelper2, etc. all follow alphabetically. Sorry if that's a little too OCD.]
This patch includes:
-A unit test in TestTezCompiler.java
-An e2e test

DANIEL: Can you check that my e2e test looks appropriate? I wasn't sure which test data set to choose, I just picked studenttab20m.
src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
Revision d62b2a1 New Change
[20] 468 lines
[+20] [+] public void visitCross(POCross op) throws VisitorException {
469
        throw new TezCompilerException(msg, errCode, PigException.BUG);
469
        throw new TezCompilerException(msg, errCode, PigException.BUG);
470
    }
470
    }
471

    
   
471

   
472
    @Override
472
    @Override
473
    public void visitDistinct(PODistinct op) throws VisitorException {
473
    public void visitDistinct(PODistinct op) throws VisitorException {

    
   
474
        try {

    
   
475
            POLocalRearrange lr = getLocalRearrange();

    
   
476
            lr.setDistinct(true);

    
   
477
            curTezOp.plan.addAsLeaf(lr);

    
   
478
            curTezOp.customPartitioner = op.getCustomPartitioner();

    
   
479

   

    
   
480
            // Mark the start of a new TezOperator, connecting the inputs.

    
   
481
            // TODO add distinct combiner as an optimization when supported by Tez

    
   
482
            blocking();

    
   
483

   

    
   
484
            POPackage pkg = getPackage();

    
   
485
            pkg.setDistinct(true);

    
   
486
            curTezOp.plan.add(pkg);

    
   
487

   
Moved from 751

    
   
488
            POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
Moved from 752

    
   
489
            project.setResultType(DataType.TUPLE);
Moved from 753

    
   
490
            project.setStar(false);

    
   
491
            project.setColumn(0);

    
   
492
            project.setOverloaded(false);

    
   
493

   

    
   
494
            // Note that the PODistinct is not actually added to any Tez vertex, but rather is

    
   
495
            // implemented by the action of the local rearrange, shuffle and project operations.

    
   
496
            POForEach forEach = getForEach(project, op.getRequestedParallelism());

    
   
497
            curTezOp.plan.addAsLeaf(forEach);

    
   
498
            phyToTezOpMap.put(op, curTezOp);

    
   
499
        } catch (Exception e) {
474
        int errCode = 2034;
500
            int errCode = 2034;
475
        String msg = "Cannot compile " + op.getClass().getSimpleName();
501
            String msg = "Cannot compile " + op.getClass().getSimpleName();
476
        throw new TezCompilerException(msg, errCode, PigException.BUG);
502
            throw new TezCompilerException(msg, errCode, PigException.BUG);
477
    }
503
        }

    
   
504
    }
478

    
   
505

   
479
    @Override
506
    @Override
480
    public void visitFilter(POFilter op) throws VisitorException {
507
    public void visitFilter(POFilter op) throws VisitorException {
481
        try {
508
        try {
482
            nonBlocking(op);
509
            nonBlocking(op);
[+20] [20] 37 lines
[+20] [+] public void visitLimit(POLimit op) throws VisitorException {
520
            // TODO Explicitly set the parallelism once this is supported by TezOperator.
547
            // TODO Explicitly set the parallelism once this is supported by TezOperator.
521
            blocking();
548
            blocking();
522

    
   
549

   
523
            // Then add a POPackage and a POForEach to the start of the new tezOp.
550
            // Then add a POPackage and a POForEach to the start of the new tezOp.
524
            POPackage pkg = getPackage();
551
            POPackage pkg = getPackage();
525
            POForEach forEach = getPlainForEach();
552
            POForEach forEach = getForEachPlain();
526
            curTezOp.plan.add(pkg);
553
            curTezOp.plan.add(pkg);
527
            curTezOp.plan.addAsLeaf(forEach);
554
            curTezOp.plan.addAsLeaf(forEach);
528

    
   
555

   
529
            if (!pigContext.inIllustrator) {
556
            if (!pigContext.inIllustrator) {
530
                POLimit limitCopy = new POLimit(new OperatorKey(scope, nig.getNextNodeId(scope)));
557
                POLimit limitCopy = new POLimit(new OperatorKey(scope, nig.getNextNodeId(scope)));
[+20] [20] 172 lines
[+20] [+] public void visitUnion(POUnion op) throws VisitorException {
703
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
730
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
704
            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
731
            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
705
        }
732
        }
706
    }
733
    }
707

    
   
734

   

    
   
735
    private POForEach getForEach(POProject project, int rp) {

    
   
736
        PhysicalPlan forEachPlan = new PhysicalPlan();

    
   
737
        forEachPlan.add(project);

    
   
738

   

    
   
739
        List<PhysicalPlan> forEachPlans = Lists.newArrayList();

    
   
740
        forEachPlans.add(forEachPlan);

    
   
741

   
Moved from 763

    
   
742
        List<Boolean> flatten = Lists.newArrayList();
Moved from 764

    
   
743
        flatten.add(true);

    
   
744

   

    
   
745
        POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
Moved from 767

    
   
746
        forEach.setResultType(DataType.BAG);
Moved from 768

    
   
747
        return forEach;
Moved from 769

    
   
748
    }

    
   
749

   

    
   
750
    // Get a plain POForEach: ForEach X generate flatten($1)

    
   
751
    private POForEach getForEachPlain() {
Moved from 751

    
   
752
        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
Moved from 752

    
   
753
        project.setResultType(DataType.TUPLE);
Moved from 753

    
   
754
        project.setStar(false);
Moved from 754

    
   
755
        project.setColumn(1);
Moved from 755

    
   
756
        project.setOverloaded(true);

    
   
757
        return getForEach(project, -1);

    
   
758
    }

    
   
759

   
708
    private POLoad getLoad() {
760
    private POLoad getLoad() {
709
        POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
761
        POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
710
        ld.setPc(pigContext);
762
        ld.setPc(pigContext);
711
        ld.setIsTmpLoad(true);
763
        ld.setIsTmpLoad(true);
712
        return ld;
764
        return ld;
[+20] [20] 31 lines
[+20] [+] private POPackage getPackage() {
744
        pkg.setKeyType(DataType.TUPLE);
796
        pkg.setKeyType(DataType.TUPLE);
745
        pkg.setNumInps(1);
797
        pkg.setNumInps(1);
746
        return pkg;
798
        return pkg;
747
    }
799
    }
748

    
   
800

   
749
    // Get a simple POForEach: ForEach X generate flatten($1)

   
750
    private POForEach getPlainForEach() {

   
751
        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
Moved to 752

   
752
        project.setResultType(DataType.TUPLE);
Moved to 753

   
753
        project.setStar(false);
Moved to 754

   
754
        project.setColumn(1);
Moved to 755

   
755
        project.setOverloaded(true);
Moved to 756

   
756

    
   

   
757
        PhysicalPlan addPlan = new PhysicalPlan();

   
758
        addPlan.add(project);

   
759

    
   

   
760
        List<PhysicalPlan> addPlans = Lists.newArrayList();

   
761
        addPlans.add(addPlan);

   
762

    
   

   
763
        List<Boolean> flatten = Lists.newArrayList();
Moved to 742

   
764
        flatten.add(true);
Moved to 743

   
765

    
   

   
766
        POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, addPlans, flatten);

   
767
        forEach.setResultType(DataType.BAG);
Moved to 746

   
768
        return forEach;
Moved to 747

   
769
    }
Moved to 748

   
770

    
   

   
771
    private TezOperator getTezOp() {
801
    private TezOperator getTezOp() {
772
        return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));
802
        return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));
773
    }
803
    }
774
}
804
}
775

    
   
805

   
test/e2e/pig/tests/tez.conf
Revision 24af8d3 New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
New File
 
test/org/apache/pig/tez/TestTezCompiler.java
Revision 1209d08 New Change
 
  1. src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java: Loading...
  2. test/e2e/pig/tests/tez.conf: Loading...
  3. test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld: Loading...
  4. test/org/apache/pig/tez/TestTezCompiler.java: Loading...