Review Board 1.7.22


PIG-3555 Initial implementation of Tez combiner optimization

Review Request #15261 - Created Nov. 6, 2013 and submitted

Cheolsoo Park
tez
PIG-3555
Reviewers
pig
abain, daijy, mwagner, rohini
pig-git
Initial implementation of Tez combiner optimizer. The patch includes the following changes-
* Factored out CombinerOptimizer code into a utility class called CombinerOptimizerUtil. So both MR and Tez CombinerOptimizer use this utility class instead of duplicating code.
* Introduced a new class called TezEdgeDescriptor that holds combine plans as well as various edge properties.
* Added TezEdgeDescriptors to TezOperator. Note that I added multiple descriptors for inbound edges but a single descriptor for all the outbound edges. This is because TezDagBuilder always creates an edge by connecting predecessors to the current vertex. Please let me know if you think we should allow multiple descriptors for outbound edges too.
* Refactored some code in TezDagBuilder while touching it.
ant test-tez passes.
ant test-e2e-tez passes.

I didn't add new test cases, but an e2e test case (Checkin_3) includes an algebraic udf (count) following group-by. I also manually tested it on a live cluster.
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
Revision 18a382b New Change
[20] 14 lines
[+20]
15
 * See the License for the specific language governing permissions and
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
18
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
19

    
   
19

   
20
import java.util.ArrayList;

   
21
import java.util.HashMap;

   
22
import java.util.List;

   
23
import java.util.Map;

   
24

    
   

   
25
import org.apache.commons.logging.Log;

   
26
import org.apache.commons.logging.LogFactory;

   
27
import org.apache.hadoop.conf.Configuration;

   
28

    
   

   
29
import org.apache.pig.PigException;

   
30
import org.apache.pig.FuncSpec;

   
31
import org.apache.pig.PigWarning;

   
32
import org.apache.pig.data.DataType;

   
33
import org.apache.pig.backend.executionengine.ExecException;

   
34
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;

   
35
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
20
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
36
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
21
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
37
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
22
import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
38
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;

   
39
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;

   
40
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;

   
41
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;

   
42
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;

   
43
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;

   
44
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;

   
45
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;

   
46
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;

   
47
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;

   
48
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;

   
49
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;

   
50
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;

   
51
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;

   
52
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;

   
53
import org.apache.pig.impl.plan.CompilationMessageCollector;
23
import org.apache.pig.impl.plan.CompilationMessageCollector;
54
import org.apache.pig.impl.plan.DependencyOrderWalker;

   
55
import org.apache.pig.impl.plan.DepthFirstWalker;
24
import org.apache.pig.impl.plan.DepthFirstWalker;
56
import org.apache.pig.impl.plan.OperatorKey;

   
57
import org.apache.pig.impl.plan.NodeIdGenerator;

   
58
import org.apache.pig.impl.plan.PlanException;

   
59
import org.apache.pig.impl.plan.PlanWalker;

   
60
import org.apache.pig.impl.plan.VisitorException;
25
import org.apache.pig.impl.plan.VisitorException;
61
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;

   
62
import org.apache.pig.impl.plan.optimizer.OptimizerException;

   
63
import org.apache.pig.impl.util.Pair;

   
64

    
   
26

   
65
/**
27
/**
66
 * Optimize map reduce plans to use the combiner where possible.
28
 * Optimize map reduce plans to use the combiner where possible.
67
 * Algebriac functions and distinct in nested plan of a foreach are partially 

   
68
 * computed in the map and combine phase.

   
69
 * A new foreach statement with initial and intermediate forms of algebraic

   
70
 * functions are added to map and combine plans respectively. 

   
71
 * 

   
72
 * If bag portion of group-by result is projected or a non algebraic 

   
73
 * expression/udf has bag as input, combiner will not be used. This is because 

   
74
 * the use of combiner in such case is likely to degrade performance 

   
75
 * as there will not be much reduction in data size in combine stage to 

   
76
 * offset the cost of the additional number of times (de)serialization is done.

   
77
 * 

   
78
 * 

   
79
 * Major areas for enhancement:

   
80
 * 1. use of combiner in cogroup

   
81
 * 2. queries with order-by, limit or sort in a nested foreach after group-by

   
82
 * 3. case where group-by is followed by filter that has algebraic expression

   
83
 *

   
84
 * 

   
85
 *

   
86
 *

   
87
 */
29
 */
88
public class CombinerOptimizer extends MROpPlanVisitor {
30
public class CombinerOptimizer extends MROpPlanVisitor {
89

    
   

   
90
    private static final String DISTINCT_UDF_CLASSNAME = org.apache.pig.builtin.Distinct.class.getName();

   
91

    
   

   
92
    private Log log = LogFactory.getLog(getClass());

   
93

    
   

   
94

    
   

   
95
    private CompilationMessageCollector messageCollector = null;
31
    private CompilationMessageCollector messageCollector = null;
96

    
   

   
97
    private boolean doMapAgg;
32
    private boolean doMapAgg;
98

    
   
33

   
99
    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg) {
34
    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg) {
100
        this(plan, doMapAgg, new CompilationMessageCollector());
35
        this(plan, doMapAgg, new CompilationMessageCollector());
101
    }
36
    }
102

    
   
37

   
103
    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg, 
38
    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg,
104
            CompilationMessageCollector messageCollector) {
39
            CompilationMessageCollector messageCollector) {
105

    
   

   
106
        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
40
        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
107
        this.messageCollector = messageCollector;
41
        this.messageCollector = messageCollector;
108
        this.doMapAgg = doMapAgg;
42
        this.doMapAgg = doMapAgg;
109
    }
43
    }
110

    
   
44

   
111
    public CompilationMessageCollector getMessageCollector() {
45
    public CompilationMessageCollector getMessageCollector() {
112
        return messageCollector;
46
        return messageCollector;
113
    }
47
    }
114

    
   
48

   
115
    @Override
49
    @Override
116
    public void visitMROp(MapReduceOper mr) throws VisitorException {
50
    public void visitMROp(MapReduceOper mr) throws VisitorException {
117
        log.trace("Entering CombinerOptimizer.visitMROp");
51
        CombinerOptimizerUtil.addCombiner(mr.mapPlan, mr.reducePlan, mr.combinePlan, messageCollector, doMapAgg);
118
        if (mr.reducePlan.isEmpty()) return;

   
119

    
   

   
120
        // part one - check if this MR job represents a group-by + foreach

   
121
        // Find the POLocalRearrange in the map.  I'll need it later.

   
122
        List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();

   
123
        if (mapLeaves == null || mapLeaves.size() != 1) {

   
124
            messageCollector.collect("Expected map to have single leaf!", MessageType.Warning, PigWarning.MULTI_LEAF_MAP);

   
125
            return;

   
126
        }

   
127
        PhysicalOperator mapLeaf = mapLeaves.get(0);

   
128
        if (!(mapLeaf instanceof POLocalRearrange)) {

   
129
            return;

   
130
        }

   
131
        POLocalRearrange rearrange = (POLocalRearrange)mapLeaf;

   
132

    
   

   
133
        List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();

   
134
        if (reduceRoots.size() != 1) {

   
135
            messageCollector.collect("Expected reduce to have single leaf", MessageType.Warning, PigWarning.MULTI_LEAF_REDUCE);

   
136
            return;

   
137
        }

   
138

    
   

   
139
        // I expect that the first root should always be a POPackage.  If

   
140
        // not, I don't know what's going on, so I'm out of here.

   
141
        PhysicalOperator root = reduceRoots.get(0);

   
142
        if (!(root instanceof POPackage)) {

   
143
            messageCollector.collect("Expected reduce root to be a POPackage", MessageType.Warning, PigWarning.NON_PACKAGE_REDUCE_PLAN_ROOT);

   
144
            return;

   
145
        }

   
146
        POPackage pack = (POPackage)root;

   
147

    
   

   
148
        List<PhysicalOperator> packSuccessors =

   
149
            mr.reducePlan.getSuccessors(root);

   
150
        if (packSuccessors == null || packSuccessors.size() != 1) return;

   
151
        PhysicalOperator successor = packSuccessors.get(0);

   
152

    
   

   
153
        if (successor instanceof POLimit) {

   
154
            //POLimit is acceptable, as long has it has a single foreach

   
155
            // as successor

   
156
            List<PhysicalOperator> limitSucs =

   
157
                mr.reducePlan.getSuccessors(successor);

   
158
            if(limitSucs != null && limitSucs.size() == 1 && 

   
159
                    limitSucs.get(0) instanceof POForEach) {

   
160
                // the code below will now further examine

   
161
                // the foreach

   
162
                successor = limitSucs.get(0);

   
163
            }

   
164

    
   

   
165
        } 

   
166
        if (successor instanceof POForEach) {

   
167
            POForEach foreach = (POForEach)successor;

   
168
            List<PhysicalPlan> feInners = foreach.getInputPlans();

   
169

    
   

   
170
            // find algebraic operators and also check if the foreach statement

   
171
            // is suitable for combiner use

   
172
            List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = 

   
173
                findAlgebraicOps(feInners);

   
174
            if(algebraicOps == null || algebraicOps.size() == 0){

   
175
                // the plan is not  combinable or there is nothing to combine

   
176
                //we're done

   
177
                return;

   
178
            }

   
179
            if (mr.combinePlan.getRoots().size() != 0) {

   
180
                messageCollector.collect("Wasn't expecting to find anything already "

   
181
                        + "in the combiner!", MessageType.Warning, PigWarning.NON_EMPTY_COMBINE_PLAN);

   
182
                return;

   
183
            }

   
184

    
   

   
185
            log.info("Choosing to move algebraic foreach to combiner");

   
186

    
   

   
187
            try {

   
188

    
   

   
189

    
   

   
190
                // replace PODistinct->Project[*] with distinct udf (which is Algebriac)

   
191
                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){

   
192
                    if(! (op2plan.first instanceof PODistinct))

   
193
                        continue;

   
194
                    DistinctPatcher distinctPatcher = new DistinctPatcher(op2plan.second);

   
195
                    distinctPatcher.visit();

   
196
                    if(distinctPatcher.getDistinct() == null){

   
197
                        int errCode = 2073;

   
198
                        String msg = "Problem with replacing distinct operator with distinct built-in function.";

   
199
                        throw new PlanException(msg, errCode, PigException.BUG);

   
200
                    }

   
201
                    op2plan.first = distinctPatcher.getDistinct();

   
202
                }

   
203

    
   

   
204
                //create new map foreach

   
205
                POForEach mfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());                

   
206
                Map<PhysicalOperator, Integer> op2newpos = 

   
207
                    new HashMap<PhysicalOperator, Integer>();

   
208
                Integer pos = 1;

   
209
                //create plan for each algebraic udf and add as inner plan in map-foreach 

   
210
                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){

   
211
                    PhysicalPlan udfPlan = createPlanWithPredecessors(op2plan.first, op2plan.second);

   
212
                    mfe.addInputPlan(udfPlan, false);

   
213
                    op2newpos.put(op2plan.first, pos++);

   
214
                }

   
215
                changeFunc(mfe, POUserFunc.INITIAL);

   
216

    
   

   
217
                // since we will only be creating SingleTupleBag as input to

   
218
                // the map foreach, we should flag the POProjects in the map

   
219
                // foreach inner plans to also use SingleTupleBag

   
220
                for (PhysicalPlan mpl : mfe.getInputPlans()) {

   
221
                    try {

   
222
                        new fixMapProjects(mpl).visit();

   
223
                    } catch (VisitorException e) {

   
224
                        int errCode = 2089;

   
225
                        String msg = "Unable to flag project operator to use single tuple bag.";

   
226
                        throw new PlanException(msg, errCode, PigException.BUG, e);

   
227
                    }

   
228
                }

   
229

    
   

   
230
                //create new combine foreach

   
231
                POForEach cfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());

   
232
                //add algebraic functions with appropriate projection

   
233
                addAlgebraicFuncToCombineFE(cfe, op2newpos);

   
234
                changeFunc(cfe, POUserFunc.INTERMEDIATE);

   
235

    
   

   
236
                //fix projection and function time for algebraic functions in reduce foreach

   
237
                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){

   
238
                    setProjectInput(op2plan.first, op2plan.second, op2newpos.get(op2plan.first));

   
239
                    ((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL);

   
240
                }

   
241

    
   

   
242

    
   

   
243
                // we have modified the foreach inner plans - so set them

   
244
                // again for the foreach so that foreach can do any re-initialization

   
245
                // around them.

   
246
                // FIXME - this is a necessary evil right now because the leaves are explicitly

   
247
                // stored in the POForeach as a list rather than computed each time at 

   
248
                // run time from the plans for optimization. Do we want to have the Foreach

   
249
                // compute the leaves each time and have Java optimize it (will Java optimize?)?

   
250
                mfe.setInputPlans(mfe.getInputPlans());

   
251
                cfe.setInputPlans(cfe.getInputPlans());

   
252
                foreach.setInputPlans(foreach.getInputPlans());

   
253

    
   

   
254
                //tell POCombinerPackage which fields need projected and

   
255
                // which placed in bags. First field is simple project

   
256
                // rest need to go into bags

   
257
                int numFields = algebraicOps.size() + 1; // algebraic funcs + group key

   
258
                boolean[] bags = new boolean[numFields];

   
259
                bags[0] = false;

   
260
                for (int i = 1; i < numFields; i++) {

   
261
                    bags[i] = true;

   
262
                }

   
263

    
   

   
264
                // Use the POCombiner package in the combine plan

   
265
                // as it needs to act differently than the regular

   
266
                // package operator.

   
267
                mr.combinePlan = new PhysicalPlan();

   
268
                POCombinerPackage combinePack =

   
269
                    new POCombinerPackage(pack, bags);

   
270
                mr.combinePlan.add(combinePack);

   
271
                mr.combinePlan.add(cfe);

   
272
                mr.combinePlan.connect(combinePack, cfe);

   
273

    
   

   
274
                // No need to connect projections in cfe to cp, because

   
275
                // PigCombiner directly attaches output from package to

   
276
                // root of remaining plan.

   
277

    
   

   
278
                POLocalRearrange mlr = getNewRearrange(rearrange);

   
279

    
   

   
280
                POPartialAgg mapAgg = null;

   
281
                if(doMapAgg){

   
282
                    mapAgg = createPartialAgg(cfe);

   
283
                }

   
284

    
   

   
285
                // A specialized local rearrange operator will replace

   
286
                // the normal local rearrange in the map plan. This behaves

   
287
                // like the regular local rearrange in the getNext() 

   
288
                // as far as getting its input and constructing the 

   
289
                // "key" out of the input. It then returns a tuple with

   
290
                // two fields - the key in the first position and the

   
291
                // "value" inside a bag in the second position. This output

   
292
                // format resembles the format out of a Package. This output

   
293
                // will feed to the map foreach which expects this format.

   
294
                // If the key field isn't in the project of the combiner or map foreach,

   
295
                // it is added to the end (This is required so that we can 

   
296
                // set up the inner plan of the new Local Rearrange leaf in the map

   
297
                // and combine plan to contain just the project of the key).

   
298
                patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mapAgg, mlr);

   
299
                POLocalRearrange clr = getNewRearrange(rearrange);

   
300

    
   

   
301
                mr.combinePlan.add(clr);

   
302
                mr.combinePlan.connect(cfe, clr);

   
303

    
   

   
304
                // Change the package operator in the reduce plan to

   
305
                // be the POCombiner package, as it needs to act

   
306
                // differently than the regular package operator.

   
307
                POCombinerPackage newReducePack =

   
308
                    new POCombinerPackage(pack, bags);

   
309
                mr.reducePlan.replace(pack, newReducePack);

   
310

    
   

   
311
                // the replace() above only changes

   
312
                // the plan and does not change "inputs" to 

   
313
                // operators

   
314
                // set up "inputs" for the operator after

   
315
                // package correctly

   
316
                List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();

   
317
                packList.add(newReducePack);

   
318
                List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);

   
319
                // there should be only one successor to package

   
320
                sucs.get(0).setInputs(packList);

   
321
            } catch (Exception e) {

   
322
                int errCode = 2018;

   
323
                String msg = "Internal error. Unable to introduce the combiner for optimization.";

   
324
                throw new OptimizerException(msg, errCode, PigException.BUG, e);

   
325
            }

   
326
        }

   
327
    }

   
328

    
   

   
329

    
   

   
330
    /**

   
331
     * Translate POForEach in combiner into a POPartialAgg

   
332
     * @param combineFE

   
333
     * @return partial aggregate operator

   
334
     * @throws CloneNotSupportedException 

   
335
     */

   
336
    private POPartialAgg createPartialAgg(POForEach combineFE)

   
337
            throws CloneNotSupportedException {

   
338
        String scope = combineFE.getOperatorKey().scope;

   
339
        POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope, 

   
340
                NodeIdGenerator.getGenerator().getNextNodeId(scope)));

   
341
        poAgg.addOriginalLocation(combineFE.getAlias(), combineFE.getOriginalLocations());

   
342
        poAgg.setResultType(combineFE.getResultType());

   
343

    
   

   
344
        //first plan in combine foreach is the group key

   
345
        poAgg.setKeyPlan(combineFE.getInputPlans().get(0).clone());

   
346

    
   

   
347
        List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();

   
348
        for(int i=1; i<combineFE.getInputPlans().size(); i++){

   
349
            valuePlans.add(combineFE.getInputPlans().get(i).clone());

   
350
        }

   
351
        poAgg.setValuePlans(valuePlans);

   
352
        return poAgg;

   
353
    }

   
354

    
   

   
355
    /**

   
356
     * find algebraic operators and also check if the foreach statement

   
357
     *  is suitable for combiner use

   
358
     * @param feInners inner plans of foreach

   
359
     * @return null if plan is not combinable, otherwise list of combinable operators

   
360
     * @throws VisitorException

   
361
     */

   
362
    private List<Pair<PhysicalOperator, PhysicalPlan>> 

   
363
    findAlgebraicOps(List<PhysicalPlan> feInners)

   
364
    throws VisitorException {

   
365
        ArrayList<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = new ArrayList<Pair<PhysicalOperator, PhysicalPlan>>();

   
366

    
   

   
367
        //check each foreach inner plan

   
368
        for(PhysicalPlan pplan : feInners){

   
369
            //check for presence of non combinable operators

   
370
            AlgebraicPlanChecker algChecker = new AlgebraicPlanChecker(pplan);

   
371
            algChecker.visit();

   
372
            if(algChecker.sawNonAlgebraic){

   
373
                return null;

   
374
            }

   
375

    
   

   
376
            //if we found a combinable distinct add that to list

   
377
            if(algChecker.sawDistinctAgg){

   
378
                algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(algChecker.getDistinct(), pplan));

   
379
                continue;

   
380
            }

   
381

    
   

   
382

    
   

   
383
            List<PhysicalOperator> roots = pplan.getRoots();

   
384
            //combinable operators have to be attached to POProject root(s)  

   
385
            // if root does not have a successor that is combinable, the project 

   
386
            // has to be projecting the group column . Otherwise this MR job

   
387
            //is considered not combinable as we don't want to use combiner for

   
388
            // cases where this foreach statement is projecting bags (likely to 

   
389
            // bad for performance because of additional (de)serialization costs)

   
390

    
   

   
391
            for(PhysicalOperator root : roots){

   
392
                if(root instanceof ConstantExpression){

   
393
                    continue;

   
394
                }

   
395
                if(! (root  instanceof POProject)){

   
396
                    // how can this happen? - expect root of inner plan to be 

   
397
                    // constant or project.  not combining it

   
398
                    //TODO: Warn

   
399
                    return null;

   
400
                }

   
401
                POProject proj = (POProject)root;

   
402
                POUserFunc combineUdf = getAlgebraicSuccessor(proj, pplan);

   
403
                if(combineUdf == null){

   
404
                    

   
405
                    if(proj.isProjectToEnd()){

   
406
                        //project-star or project to end

   
407
                        // not combinable

   
408
                        return null;

   
409
                    }

   
410
                    

   
411
                    // Check to see if this is a projection of the grouping column.

   
412
                    // If so, it will be a projection of col 0 

   
413
                    List<Integer> cols = proj.getColumns();

   
414
                    if (cols != null && cols.size() == 1 && cols.get(0) == 0) {

   
415
                        //it is project of grouping column, so the plan is still

   
416
                        //combinable

   
417
                        continue;

   
418
                    }else{

   
419
                        //not combinable

   
420
                        return null;

   
421
                    }

   
422
                }

   
423

    
   

   
424
                // The algebraic udf can have more than one input. Add the udf only once

   
425
                boolean exist = false;

   
426
                for (Pair<PhysicalOperator, PhysicalPlan> pair : algebraicOps) {

   
427
                    if (pair.first.equals(combineUdf)) {

   
428
                        exist = true;

   
429
                        break;

   
430
                    }

   
431
                }

   
432
                if (!exist)

   
433
                    algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));

   
434
            }

   
435
        }

   
436

    
   

   
437
        return algebraicOps;

   
438
    }

   
439

    
   

   
440
    /**

   
441
     * Look for a algebraic POUserFunc as successor to this project, called

   
442
     * recursively to skip any other projects seen on the way.  

   
443
     * @param proj project

   
444
     * @param pplan physical plan

   
445
     * @return null if any operator other POProject or algebraic POUserFunc is

   
446
     * found while going down the plan, otherwise algebraic POUserFunc is returned

   
447
     */

   
448
    private POUserFunc getAlgebraicSuccessor(POProject proj, PhysicalPlan pplan) {

   
449
        //check if root is followed by combinable operator

   
450
        List<PhysicalOperator> succs = pplan.getSuccessors(proj);

   
451
        if(succs == null || succs.size() == 0){

   
452
            return null;

   
453
        }

   
454
        if(succs.size() > 1){

   
455
            //project shared by more than one operator - does not happen 

   
456
            // in plans generated today

   
457
            // won't try to combine this

   
458
            return null;

   
459
        }

   
460

    
   

   
461

    
   

   
462
        PhysicalOperator succ = succs.get(0);

   
463
        if(succ instanceof POProject){

   
464
            return getAlgebraicSuccessor((POProject) succ, pplan);

   
465
        }

   
466

    
   

   
467
        if(succ instanceof POUserFunc && ((POUserFunc)succ).combinable() ){

   
468
            return (POUserFunc)succ;

   
469
        }

   
470

    
   

   
471
        //some other operator ? can't combine

   
472
        return null;

   
473
    }

   
474
    

   
475

    
   

   
476
    /**

   
477
     * Create a new foreach with same scope,alias as given foreach

   
478
     * add an inner plan that projects the group column, which is going to be

   
479
     * the first input

   
480
     * @param foreach source foreach

   
481
     * @param keyType type for group-by key

   
482
     * @return new POForeach

   
483
     */

   
484
    private POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {

   
485
        String scope = foreach.getOperatorKey().scope;

   
486
        POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());

   
487
        newFE.addOriginalLocation(foreach.getAlias(), foreach.getOriginalLocations());

   
488
        newFE.setResultType(foreach.getResultType());

   
489
        //create plan that projects the group column 

   
490
        PhysicalPlan grpProjPlan = new PhysicalPlan();

   
491
        //group by column is the first column

   
492
        POProject proj = new POProject(createOperatorKey(scope), 1, 0);

   
493
        proj.setResultType(keyType);

   
494
        grpProjPlan.add(proj);

   
495

    
   

   
496
        newFE.addInputPlan(grpProjPlan, false);

   
497
        return newFE;

   
498
    }

   
499
    

   
500
    /**

   
501
     * Create new plan and  add to it the clones of operator algeOp  and its 

   
502
     * predecessors from the physical plan pplan .

   
503
     * @param algeOp algebraic operator 

   
504
     * @param pplan physical plan that has algeOp

   
505
     * @return new plan

   
506
     * @throws CloneNotSupportedException

   
507
     * @throws PlanException

   
508
     */

   
509
    private PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)

   
510
    throws CloneNotSupportedException, PlanException {

   
511
        PhysicalPlan newplan = new PhysicalPlan();

   
512
        addPredecessorsToPlan(algeOp, pplan, newplan);

   
513
        return newplan;

   
514
    }

   
515

    
   

   
516
    /**

   
517
     * Recursively clone op and its predecessors from pplan and add them to newplan

   
518
     * @param op

   
519
     * @param pplan

   
520
     * @param newplan

   
521
     * @return

   
522
     * @throws CloneNotSupportedException

   
523
     * @throws PlanException

   
524
     */

   
525
    private PhysicalOperator addPredecessorsToPlan(PhysicalOperator op, PhysicalPlan pplan,

   
526
            PhysicalPlan newplan)

   
527
    throws CloneNotSupportedException, PlanException {

   
528
        PhysicalOperator newOp = op.clone();

   
529
        newplan.add(newOp);

   
530
        if(pplan.getPredecessors(op) == null || pplan.getPredecessors(op).size() == 0){

   
531
            return newOp;

   
532
        }        

   
533
        for(PhysicalOperator pred : pplan.getPredecessors(op)){

   
534
            PhysicalOperator newPred = addPredecessorsToPlan(pred, pplan, newplan);

   
535
            newplan.connect(newPred, newOp);

   
536
        }

   
537
        return newOp;

   
538
    }

   
539
    

   
540

    
   

   
541

    
   

   
542

    
   

   
543
    /**

   
544
     * add algebraic functions with appropriate projection to new foreach in combiner

   
545
     * @param cfe - the new foreach in combiner 

   
546
     * @param op2newpos - mapping of physical operator to position in input

   
547
     * @throws CloneNotSupportedException

   
548
     * @throws PlanException

   
549
     */

   
550
    private void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)

   
551
    throws CloneNotSupportedException, PlanException {

   
552

    
   

   
553
        //an array that we will first populate with physical operators in order 

   
554
        //of their position in input. Used while adding plans to combine foreach

   
555
        // just so that output of combine foreach same positions as input. That

   
556
        // means the same operator to position mapping can be used by reduce as well

   
557
        PhysicalOperator[] opsInOrder = new PhysicalOperator[op2newpos.size() + 1];

   
558
        for(Map.Entry<PhysicalOperator, Integer> op2pos : op2newpos.entrySet()){

   
559
            opsInOrder[op2pos.getValue()] = op2pos.getKey();

   
560
        }

   
561

    
   

   
562
        // first position is used by group column and a plan has been added for it,

   
563
        //so start with 1

   
564
        for(int i=1; i < opsInOrder.length; i++){

   
565
            //create new inner plan for foreach

   
566
            //add cloned copy of given physical operator and a new project.

   
567
            // Even if the udf in query takes multiple input, only one project

   
568
            // needs to be added because input to this udf

   
569
            //will be the INITIAL version of udf evaluated in map. 

   
570
            PhysicalPlan newPlan = new PhysicalPlan();

   
571
            PhysicalOperator newOp = opsInOrder[i].clone();

   
572
            newPlan.add(newOp);

   
573
            POProject proj = new POProject(

   
574
                    createOperatorKey(cfe.getOperatorKey().getScope()),

   
575
                    1, i

   
576
            );

   
577
            proj.setResultType(DataType.BAG);

   
578
            newPlan.add(proj);

   
579
            newPlan.connect(proj, newOp);

   
580
            cfe.addInputPlan(newPlan, false);

   
581
        }

   
582
    }

   
583

    
   

   
584
    /**

   
585
     * Replace old POLocalRearrange with new pre-combine LR,

   
586
     * add new map foreach, new map-local-rearrange, and connect them

   
587
     * 

   
588
     * @param mapPlan

   
589
     * @param preCombinerLR

   
590
     * @param mfe

   
591
     * @param mapAgg 

   
592
     * @param mlr

   
593
     * @throws PlanException 

   
594
     */

   
595
    private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,

   
596
            POForEach mfe, POPartialAgg mapAgg, POLocalRearrange mlr)

   
597
                    throws PlanException {

   
598

    
   

   
599
        POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);

   
600
        mapPlan.replace(oldLR, preCombinerLR);

   
601

    
   

   
602
        mapPlan.add(mfe);

   
603
        mapPlan.connect(preCombinerLR, mfe);

   
604

    
   

   
605
        //the operator before local rearrange

   
606
        PhysicalOperator opBeforeLR = mfe;

   
607

    
   

   
608
        if(mapAgg != null){

   
609
            mapPlan.add(mapAgg);

   
610
            mapPlan.connect(mfe, mapAgg);

   
611
            opBeforeLR = mapAgg;

   
612
        }

   
613

    
   

   
614
        mapPlan.add(mlr);

   
615
        mapPlan.connect(opBeforeLR, mlr);

   
616
    }

   
617

    
   

   
618
    /**

   
619
     * @param rearrange

   
620
     * @return

   
621
     */

   
622
    private POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {

   
623

    
   

   
624
        String scope = rearrange.getOperatorKey().scope;

   
625
        POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(

   
626
                createOperatorKey(scope),

   
627
                rearrange.getRequestedParallelism(), rearrange.getInputs());

   
628
        pclr.setPlans(rearrange.getPlans());

   
629
        return pclr;

   
630
    }

   
631

    
   

   
632
    private OperatorKey createOperatorKey(String scope) {

   
633
        return new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope));

   
634
    }

   
635

    
   

   
636

    
   

   
637
    /**

   
638
     * @param op

   
639
     * @param index 

   
640
     * @param plan 

   
641
     * @throws PlanException 

   
642
     */

   
643
    private void setProjectInput(PhysicalOperator op, PhysicalPlan plan, int index) throws PlanException {

   
644
        String scope = op.getOperatorKey().scope;

   
645
        POProject proj = new POProject(new OperatorKey(scope, 

   
646
                NodeIdGenerator.getGenerator().getNextNodeId(scope)),

   
647
                op.getRequestedParallelism(), index);

   
648
        proj.setResultType(DataType.BAG);

   
649
        // Remove old connections and elements from the plan

   
650
        plan.trimAbove(op);

   
651
        plan.add(proj);

   
652
        plan.connect(proj, op);

   
653
        List<PhysicalOperator> inputs =

   
654
            new ArrayList<PhysicalOperator>(1);

   
655
        inputs.add(proj);

   
656
        op.setInputs(inputs);

   
657

    
   

   
658
    }

   
659

    
   

   
660
    /**

   
661
     * Change the algebriac function type for algebraic functions in map and combine

   
662
     * In map and combine the algebraic functions will be leaf of the plan

   
663
     * @param fe

   
664
     * @param type

   
665
     * @throws PlanException

   
666
     */

   
667
    private void changeFunc(POForEach fe, byte type) throws PlanException {

   
668
        for(PhysicalPlan plan : fe.getInputPlans()){

   
669
            List<PhysicalOperator> leaves = plan.getLeaves();

   
670
            if (leaves == null || leaves.size() != 1) {

   
671
                int errCode = 2019;

   
672
                String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";

   
673
                throw new PlanException(msg, errCode, PigException.BUG);

   
674
            }

   
675

    
   

   
676
            PhysicalOperator leaf = leaves.get(0);

   
677
            if(leaf instanceof POProject){

   
678
                continue;

   
679
            }

   
680
            if (!(leaf instanceof POUserFunc)) {

   
681
                int errCode = 2020;

   
682
                String msg = "Expected to find plan with UDF or project leaf. Found " + leaf.getClass().getSimpleName();

   
683
                throw new PlanException(msg, errCode, PigException.BUG);

   
684
            }

   
685

    
   

   
686
            POUserFunc func = (POUserFunc)leaf;

   
687
            try {

   
688
                func.setAlgebraicFunction(type);

   
689
            } catch (ExecException e) {

   
690
                int errCode = 2075;

   
691
                String msg = "Could not set algebraic function type.";

   
692
                throw new PlanException(msg, errCode, PigException.BUG, e);

   
693
            }

   
694
        }

   
695
    }

   
696

    
   

   
697

    
   

   
698
    /**

   
699
     * create new Local rearrange by cloning existing rearrange and 

   
700
     * add plan for projecting the key

   
701
     * @param rearrange

   
702
     * @return

   
703
     * @throws PlanException

   
704
     * @throws CloneNotSupportedException

   
705
     */

   
706
    private POLocalRearrange getNewRearrange(POLocalRearrange rearrange)

   
707
    throws PlanException, CloneNotSupportedException {

   
708
        

   
709
        POLocalRearrange newRearrange = rearrange.clone();

   
710
        

   
711
        // Set the projection to be the key

   
712
        PhysicalPlan newPlan = new PhysicalPlan();

   
713
        String scope = newRearrange.getOperatorKey().scope;

   
714
        POProject proj = new POProject(new OperatorKey(scope, 

   
715
                NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);

   
716
        proj.setResultType(newRearrange.getKeyType());

   
717
        newPlan.add(proj);

   
718
        

   
719
        List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(1);

   
720
        plans.add(newPlan);

   
721
        newRearrange.setPlansFromCombiner(plans);

   
722
        

   
723
        return newRearrange;

   
724
    }

   
725

    
   

   
726
    /**

   
727
     * Checks if there is something that prevents the use of algebraic interface,

   
728
     * and looks for the PODistinct that can be used as algebraic

   
729
     * 

   
730
     */

   
731
    private static class AlgebraicPlanChecker extends PhyPlanVisitor {

   
732
        boolean sawNonAlgebraic = false;

   
733
        boolean sawDistinctAgg = false;

   
734
        private boolean sawForeach = false;

   
735
        private PODistinct distinct = null;

   
736

    
   

   
737

    
   

   
738
        AlgebraicPlanChecker(PhysicalPlan plan) {

   
739
            super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));

   
740
        }

   
741

    
   

   
742
        /* (non-Javadoc)

   
743
         * @see org.apache.pig.impl.plan.PlanVisitor#visit()

   
744
         */

   
745
        @Override

   
746
        public void visit() throws VisitorException {

   
747
            super.visit();

   
748
            // if we saw foreach and distinct agg its ok

   
749
            // else if we only saw foreach, mark it as non algebraic

   
750
            if(sawForeach && !sawDistinctAgg) {

   
751
                sawNonAlgebraic = true;

   
752
            }

   
753
        }

   
754

    
   

   
755
        @Override

   
756
        public void visitDistinct(PODistinct distinct) throws VisitorException {

   
757
            this.distinct = distinct;

   
758
            if(sawDistinctAgg) {

   
759
                // we want to combine only in the case where there is only

   
760
                // one PODistinct which is the only input to an agg

   
761
                // we apparently have seen a PODistinct before, so lets not

   
762
                // combine.

   
763
                sawNonAlgebraic = true;

   
764
                return;

   
765
            }

   
766
            // check that this distinct is the only input to an agg

   
767
            // We could have the following two cases

   
768
            // script 1:

   
769
            // ..

   
770
            // b = group a by ...

   
771
            // c = foreach b { x = distinct a; generate AGG(x), ...}

   
772
            // The above script leads to the following plan for AGG(x):

   
773
            // POUserFunc(org.apache.pig.builtin.COUNT)[long] 

   
774
            //   |

   
775
            //   |---Project[bag][*] 

   
776
            //       |

   
777
            //       |---PODistinct[bag] 

   
778
            //           |

   
779
            //           |---Project[tuple][1] 

   
780

    
   

   
781
            // script 2:

   
782
            // ..

   
783
            // b = group a by ...

   
784
            // c = foreach b { x = distinct a; generate AGG(x.$1), ...}

   
785
            // The above script leads to the following plan for AGG(x.$1):

   
786
            // POUserFunc(org.apache.pig.builtin.IntSum)[long]

   
787
            //   |

   
788
            //   |---Project[bag][1]

   
789
            //       |

   
790
            //       |---Project[bag][*]

   
791
            //           |

   
792
            //           |---PODistinct[bag]

   
793
            //               |

   
794
            //               |---Project[tuple][1]

   
795
            // So tracing from the PODistinct to its successors upto the leaf, we should

   
796
            // see a Project[bag][*] as the immediate successor and an optional Project[bag]

   
797
            // as the next successor till we see the leaf.

   
798
            PhysicalOperator leaf = mPlan.getLeaves().get(0);

   
799
            // the leaf has to be a POUserFunc (need not be algebraic)

   
800
            if(leaf instanceof POUserFunc) {

   
801

    
   

   
802
                // we want to combine only in the case where there is only

   
803
                // one PODistinct which is the only input to an agg.

   
804
                // Do not combine if there are additional inputs.

   
805
                List<PhysicalOperator> preds = mPlan.getPredecessors(leaf);

   
806
                if (preds.size() > 1) {

   
807
                    sawNonAlgebraic = true;

   
808
                    return;

   
809
                }

   
810

    
   

   
811
                List<PhysicalOperator> immediateSuccs = mPlan.getSuccessors(distinct);

   
812
                if(immediateSuccs.size() == 1 && immediateSuccs.get(0) instanceof POProject) {

   
813
                    if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { // script 1 above

   
814
                        sawDistinctAgg = true;

   
815
                        return;

   
816
                    } else { // check for script 2 scenario above

   
817
                        List<PhysicalOperator> nextSuccs = mPlan.getSuccessors(immediateSuccs.get(0));

   
818
                        if(nextSuccs.size() == 1) {

   
819
                            PhysicalOperator op = nextSuccs.get(0);

   
820
                            if(op instanceof POProject) {

   
821
                                if(checkSuccessorIsLeaf(leaf, op)) {

   
822
                                    sawDistinctAgg = true;

   
823
                                    return;

   
824
                                }

   
825
                            }

   
826
                        }

   
827

    
   

   
828
                    }

   
829
                }

   
830
            }

   
831
            // if we did not return above, that means we did not see

   
832
            // the pattern we expected

   
833
            sawNonAlgebraic = true;

   
834
        }

   
835

    
   

   
836
        /**

   
837
         * @return the distinct

   
838
         */

   
839
        public PODistinct getDistinct() {

   
840
            if(sawNonAlgebraic)

   
841
                return null;

   
842
            return distinct;

   
843
        }
52
    }
844

    
   

   
845
        @Override

   
846
        public void visitLimit(POLimit limit) throws VisitorException {

   
847
            sawNonAlgebraic = true;

   
848
        }

   
849

    
   

   
850
        private boolean checkSuccessorIsLeaf(PhysicalOperator leaf, PhysicalOperator opToCheck) {

   
851
            List<PhysicalOperator> succs = mPlan.getSuccessors(opToCheck);

   
852
            if(succs.size() == 1) {

   
853
                PhysicalOperator op = succs.get(0);

   
854
                if(op == leaf) {

   
855
                    return true;

   
856
                }

   
857
            }

   
858
            return false;

   
859
        }

   
860

    
   

   
861
        @Override

   
862
        public void visitFilter(POFilter filter) throws VisitorException {

   
863
            sawNonAlgebraic = true;

   
864
        }

   
865

    
   

   
866
        @Override

   
867
        public void visitPOForEach(POForEach fe) throws VisitorException {

   
868
            // we need to allow foreach as input for distinct

   
869
            // but don't want it for other things (why?). So lets

   
870
            // flag the presence of Foreach and if this is present

   
871
            // with a distinct agg, it will be allowed.

   
872
            sawForeach = true;

   
873
        }

   
874

    
   

   
875
        @Override

   
876
        public void visitSort(POSort sort) throws VisitorException {

   
877
            sawNonAlgebraic = true;

   
878
        }

   
879

    
   

   
880
    }

   
881

    
   

   
882
    /**

   
883
     * A visitor to replace   

   
884
     * Project[bag][*] 

   
885
     *  |

   
886
     *  |---PODistinct[bag]

   
887
     * with 

   
888
     * POUserFunc(org.apache.pig.builtin.Distinct)[DataBag]    

   
889
     */

   
890
    private static class DistinctPatcher extends PhyPlanVisitor {

   
891

    
   

   
892
        private POUserFunc distinct = null;

   
893
        /**

   
894
         * @param plan

   
895
         * @param walker

   
896
         */

   
897
        public DistinctPatcher(PhysicalPlan plan,

   
898
                PlanWalker<PhysicalOperator, PhysicalPlan> walker) {

   
899
            super(plan, walker);

   
900
        }

   
901

    
   

   
902
        /**

   
903
         * @param physicalPlan

   
904
         */

   
905
        public DistinctPatcher(PhysicalPlan physicalPlan) {

   
906
            this(physicalPlan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(physicalPlan));

   
907
        }

   
908

    
   

   
909
        /* (non-Javadoc)

   
910
         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)

   
911
         */

   
912
        @Override

   
913
        public void visitProject(POProject proj) throws VisitorException {

   
914
            // check if this project is preceded by PODistinct and

   
915
            // has the return type bag

   
916

    
   

   
917

    
   

   
918
            List<PhysicalOperator> preds = mPlan.getPredecessors(proj);

   
919
            if(preds == null) return; // this is a leaf project and so not interesting for patching

   
920
            PhysicalOperator pred = preds.get(0);

   
921
            if(preds.size() == 1 && pred instanceof PODistinct) {

   
922
                if(distinct != null) {

   
923
                    // we should not already have been patched since the

   
924
                    // Project-Distinct pair should occur only once

   
925
                    int errCode = 2076;

   
926
                    String msg = "Unexpected Project-Distinct pair while trying to set up plans for use with combiner.";

   
927
                    throw new OptimizerException(msg, errCode, PigException.BUG);

   
928
                }

   
929
                // we have stick in the POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]

   
930
                // in place of the Project-PODistinct pair

   
931
                PhysicalOperator distinctPredecessor = mPlan.getPredecessors(pred).get(0);

   
932

    
   

   
933
                POUserFunc func = null;

   
934

    
   

   
935
                try {

   
936
                    String scope = proj.getOperatorKey().scope;

   
937
                    List<PhysicalOperator> funcInput = new ArrayList<PhysicalOperator>();

   
938
                    FuncSpec fSpec = new FuncSpec(DISTINCT_UDF_CLASSNAME);

   
939
                    funcInput.add(distinctPredecessor);

   
940
                    // explicitly set distinctPredecessor's result type to

   
941
                    // be tuple - this is relevant when distinctPredecessor is

   
942
                    // originally a POForeach with return type BAG - we need to

   
943
                    // set it to tuple so we get a stream of tuples. 

   
944
                    distinctPredecessor.setResultType(DataType.TUPLE);

   
945
                    func = new POUserFunc(new OperatorKey(scope, 

   
946
                            NodeIdGenerator.getGenerator().getNextNodeId(scope)),-1, funcInput, fSpec);

   
947
                    func.setResultType(DataType.BAG);

   
948
                    mPlan.replace(proj, func);

   
949
                    mPlan.remove(pred);

   
950
                    // connect the the newly added "func" to

   
951
                    // the predecessor to the earlier PODistinct

   
952
                    mPlan.connect(distinctPredecessor, func);

   
953
                } catch (PlanException e) {

   
954
                    int errCode = 2077;

   
955
                    String msg = "Problem with reconfiguring plan to add distinct built-in function.";

   
956
                    throw new OptimizerException(msg, errCode, PigException.BUG, e);

   
957
                }

   
958
                distinct = func;

   
959
            } 

   
960
        }

   
961

    
   

   
962
        POUserFunc getDistinct(){

   
963
            return distinct;

   
964
        }

   
965

    
   

   
966

    
   

   
967
    }

   
968

    
   

   
969
    private static class fixMapProjects extends PhyPlanVisitor {

   
970

    
   

   
971
        public fixMapProjects(PhysicalPlan plan) {

   
972
            this(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(

   
973
                    plan));

   
974
        }

   
975

    
   

   
976
        /**

   
977
         * @param plan

   
978
         * @param walker

   
979
         */

   
980
        public fixMapProjects(PhysicalPlan plan,

   
981
                PlanWalker<PhysicalOperator, PhysicalPlan> walker) {

   
982
            super(plan, walker);

   
983
        }

   
984

    
   

   
985
        /*

   
986
         * (non-Javadoc)

   
987
         * 

   
988
         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)

   
989
         */

   
990
        @Override

   
991
        public void visitProject(POProject proj) throws VisitorException {

   
992
            if (proj.getResultType() == DataType.BAG) {

   
993

    
   

   
994
                // IMPORTANT ASSUMPTION:

   
995
                // we should be calling this visitor only for

   
996
                // fixing up the projects in the map's foreach

   
997
                // inner plan. In the map side, we are dealing

   
998
                // with single tuple bags - so set the flag in

   
999
                // the project to use single tuple bags. If in

   
1000
                // future we don't have single tuple bags in the

   
1001
                // input to map's foreach, we should NOT be doing

   
1002
                // this!

   
1003
                proj.setResultSingleTupleBag(true);

   
1004

    
   

   
1005
            }

   
1006
        }

   
1007

    
   

   
1008
    }

   
1009

    
   

   
1010
}
53
}
src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java
Revision e69de29 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
Revision 0b1f3c9 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Revision 45e47b0 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
Revision e69de29 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
Revision 3f14644 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
Revision e612d88 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
Revision 5a42ded New Change
 
src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
Revision e69de29 New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld
Revision 925f07e New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld
Revision a3974fe New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld
Revision a8c942b New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
Revision fb7c903 New Change
 
test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
Revision e6cd25e New Change
 
  1. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java: Loading...
  2. src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java: Loading...
  3. src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java: Loading...
  4. src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java: Loading...
  5. src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java: Loading...
  6. src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java: Loading...
  7. src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java: Loading...
  8. src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java: Loading...
  9. src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java: Loading...
  10. test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld: Loading...
  11. test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld: Loading...
  12. test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld: Loading...
  13. test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld: Loading...
  14. test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld: Loading...