Review Board 1.7.22


PIG-3562 Implement combiner optimizations for DISTINCT in Tez

Review Request #16717 - Created Jan. 8, 2014 and updated

Alex Bain
tez
PIG-3562
Reviewers
pig
cheolsoo, daijy, mwagner, rohini
pig-git
Implement DISTINCT combiner optimizations in Tez

1. Use a combiner with normal uses of DISTINCT. In MR Pig, there are some global variables and a special DistinctCombiner class that throws away the duplicate tuples. We could hack this into Pig-on-Tez, but instead I just reused the reduce plan as the combiner plan, which does the same thing (through a POPackage->POProject->POForEach with the setDistinct property set to true).

I'm a little bit concerned that this combiner plan could somehow be slower than the special DistinctCombiner class, but I don't see how.

There is also a special CombinerPackager packager that I did NOT use for this. I think that packager is really intended for use with the algebraic UDF combiner optimizations only.

2. I carefully verified that DISTINCT nested inside a FOREACH code block is optimized by the CombinerOptimizer into an algebraic UDF version of DISTINCT. I added TestTezCompiler and e2e tests for this. Cheolsoo already made all the combiner changes for this to work correctly - I didn't make any code changes here.
Updated golden file for existing TestTezCompiler DISTINCT test to include combiner plan
Added TestTezCompiler test and golden file for DISTINCT algebraic udf combiner
Added e2e test that runs DISTINCT with algebraic udf combiner
I am getting some test-e2e-tez failures in ORDER BY tests, but I am also getting these in a clean Tez branch. My new e2e test passes.
src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
Revision a7de3a7 New Change
[20] 633 lines
[+20] [+] public void visitCross(POCross op) throws VisitorException {
634
        try {
634
        try {
635
            POLocalRearrange lr = localRearrangeFactory.create();
635
            POLocalRearrange lr = localRearrangeFactory.create();
636
            lr.setDistinct(true);
636
            lr.setDistinct(true);
637
            curTezOp.plan.addAsLeaf(lr);
637
            curTezOp.plan.addAsLeaf(lr);
638
            curTezOp.customPartitioner = op.getCustomPartitioner();
638
            curTezOp.customPartitioner = op.getCustomPartitioner();

    
   
639
            TezOperator lastOp = curTezOp;
639

    
   
640

   
640
            // Mark the start of a new TezOperator, connecting the inputs.
641
            // Mark the start of a new TezOperator, connecting the inputs.
641
            // TODO add distinct combiner as an optimization when supported by Tez

   
642
            blocking();
642
            blocking();
643

    
   
643

   

    
   
644
            // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented

    
   
645
            // with a global variable and a specific DistinctCombiner class. This seems better.

    
   
646
            PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;

    
   
647
            addDistinctPlan(combinePlan, 1);

    
   
648

   

    
   
649
            POLocalRearrangeTez clr = localRearrangeFactory.create();

    
   
650
            clr.setDistinct(true);

    
   
651
            combinePlan.addAsLeaf(clr);

    
   
652

   

    
   
653
            addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
Moved from 658

    
   
654
            phyToTezOpMap.put(op, curTezOp);
Moved from 659

    
   
655
        } catch (Exception e) {
Moved from 660

    
   
656
            int errCode = 2034;
Moved from 661

    
   
657
            String msg = "Cannot compile " + op.getClass().getSimpleName();
Moved from 662

    
   
658
            throw new TezCompilerException(msg, errCode, PigException.BUG);
Moved from 663

    
   
659
        }

    
   
660
    }

    
   
661

   

    
   
662
    // Adds the plan for DISTINCT. Note that the PODistinct is not actually added to the plan, but

    
   
663
    // rather is implemented by the action of the local rearrange, shuffle and project operations.

    
   
664
    private void addDistinctPlan(PhysicalPlan plan, int rp) throws PlanException {
644
            POPackage pkg = getPackage(1, DataType.TUPLE);
665
        POPackage pkg = getPackage(1, DataType.TUPLE);
645
            pkg.getPkgr().setDistinct(true);
666
        pkg.getPkgr().setDistinct(true);
646
            curTezOp.plan.add(pkg);
667
        plan.addAsLeaf(pkg);
647

    
   
668

   
648
            POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
669
        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
649
            project.setResultType(DataType.TUPLE);
670
        project.setResultType(DataType.TUPLE);
650
            project.setStar(false);
671
        project.setStar(false);
651
            project.setColumn(0);
672
        project.setColumn(0);
652
            project.setOverloaded(false);
673
        project.setOverloaded(false);
653

    
   
674

   
654
            // Note that the PODistinct is not actually added to any Tez vertex, but rather is
675
        POForEach forEach = getForEach(project, rp);
655
            // implemented by the action of the local rearrange, shuffle and project operations.
676
        plan.addAsLeaf(forEach);
656
            POForEach forEach = getForEach(project, op.getRequestedParallelism());

   
657
            curTezOp.plan.addAsLeaf(forEach);

   
658
            phyToTezOpMap.put(op, curTezOp);
Moved to 654

   
659
        } catch (Exception e) {
Moved to 655

   
660
            int errCode = 2034;
Moved to 656

   
661
            String msg = "Cannot compile " + op.getClass().getSimpleName();
Moved to 657

   
662
            throw new TezCompilerException(msg, errCode, PigException.BUG);
Moved to 658

   
663
        }
Moved to 659

   
664
    }
677
    }
665

    
   
678

   
666
    @Override
679
    @Override
667
    public void visitFilter(POFilter op) throws VisitorException {
680
    public void visitFilter(POFilter op) throws VisitorException {
668
        try {
681
        try {
[+20] [20] 939 lines
test/e2e/pig/tests/tez.conf
Revision 71cdcbc New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld
New File
 
test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
Revision 35d9313 New Change
 
test/org/apache/pig/tez/TestTezCompiler.java
Revision 2252531 New Change
 
  1. src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java: Loading...
  2. test/e2e/pig/tests/tez.conf: Loading...
  3. test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld: Loading...
  4. test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld: Loading...
  5. test/org/apache/pig/tez/TestTezCompiler.java: Loading...