Review Board 1.7.22


Allow UDFs to flatten themselves

Review Request #9060 - Created Jan. 23, 2013 and updated

Jonathan Coveney
PIG-3010
Reviewers
pig
pig-git
see PIG-3010

 

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java: Loading...
  2. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java: Loading...
  3. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java: Loading...
  4. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java: Loading...
  5. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java: Loading...
  6. src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java: Loading...
  7. src/org/apache/pig/builtin/FlattenOutput.java: Loading...
  8. src/org/apache/pig/builtin/UdfFlatten.java: Loading...
  9. src/org/apache/pig/newplan/logical/Util.java: Loading...
  10. src/org/apache/pig/newplan/logical/relational/LOGenerate.java: Loading...
  11. src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java: Loading...
  12. src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java: Loading...
  13. src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java: Loading...
  14. src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java: Loading...
  15. src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java: Loading...
  16. src/org/apache/pig/newplan/logical/rules/MergeForEach.java: Loading...
  17. src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java: Loading...
  18. src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java: Loading...
  19. src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java: Loading...
  20. src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java: Loading...
  21. src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java: Loading...
  22. src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java: Loading...
  23. src/org/apache/pig/parser/LogicalPlanBuilder.java: Loading...
  24. src/org/apache/pig/parser/LogicalPlanGenerator.g: Loading...
  25. test/org/apache/pig/test/OptimizeLimitPlanPrinter.java: Loading...
  26. test/org/apache/pig/test/TestExampleGenerator.java: Loading...
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Revision eddffad New Change
[20] 83 lines
[+20]
84
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
84
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
85
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
85
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
86
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
86
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
87
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
87
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
88
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
88
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;

    
   
89
import org.apache.pig.builtin.FlattenOutput.FlattenStates;
89
import org.apache.pig.data.DataType;
90
import org.apache.pig.data.DataType;
90
import org.apache.pig.impl.PigContext;
91
import org.apache.pig.impl.PigContext;
91
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
92
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
92
import org.apache.pig.impl.builtin.FindQuantiles;
93
import org.apache.pig.impl.builtin.FindQuantiles;
93
import org.apache.pig.impl.builtin.GetMemNumRows;
94
import org.apache.pig.impl.builtin.GetMemNumRows;
[+20] [20] 19 lines
[+20]
113
import org.apache.pig.impl.util.Utils;
114
import org.apache.pig.impl.util.Utils;
114
import org.apache.pig.newplan.logical.relational.LOJoin;
115
import org.apache.pig.newplan.logical.relational.LOJoin;
115

    
   
116

   
116
/**
117
/**
117
 * The compiler that compiles a given physical plan
118
 * The compiler that compiles a given physical plan
118
 * into a DAG of MapReduce operators which can then 
119
 * into a DAG of MapReduce operators which can then
119
 * be converted into the JobControl structure.
120
 * be converted into the JobControl structure.
120
 * 
121
 *
121
 * Is implemented as a visitor of the PhysicalPlan it
122
 * Is implemented as a visitor of the PhysicalPlan it
122
 * is compiling.
123
 * is compiling.
123
 * 
124
 *
124
 * Currently supports all operators except the MR Sort
125
 * Currently supports all operators except the MR Sort
125
 * operator 
126
 * operator
126
 * 
127
 *
127
 * Uses a predecessor based depth first traversal. 
128
 * Uses a predecessor based depth first traversal.
128
 * To compile an operator, first compiles
129
 * To compile an operator, first compiles
129
 * the predecessors into MapReduce Operators and tries to
130
 * the predecessors into MapReduce Operators and tries to
130
 * merge the current operator into one of them. The goal
131
 * merge the current operator into one of them. The goal
131
 * being to keep the number of MROpers to a minimum.
132
 * being to keep the number of MROpers to a minimum.
132
 * 
133
 *
133
 * It also merges multiple Map jobs, created by compiling
134
 * It also merges multiple Map jobs, created by compiling
134
 * the inputs individually, into a single job. Here a new
135
 * the inputs individually, into a single job. Here a new
135
 * map job is created and then the contents of the previous
136
 * map job is created and then the contents of the previous
136
 * map plans are added. However, any other state that was in
137
 * map plans are added. However, any other state that was in
137
 * the previous map plans, should be manually moved over. So,
138
 * the previous map plans, should be manually moved over. So,
138
 * if you are adding something new take care about this.
139
 * if you are adding something new take care about this.
139
 * Ex of this is in requestedParallelism
140
 * Ex of this is in requestedParallelism
140
 * 
141
 *
141
 * Only in case of blocking operators and splits, a new 
142
 * Only in case of blocking operators and splits, a new
142
 * MapReduce operator is started using a store-load combination
143
 * MapReduce operator is started using a store-load combination
143
 * to connect the two operators. Whenever this happens
144
 * to connect the two operators. Whenever this happens
144
 * care is taken to add the MROper into the MRPlan and connect it
145
 * care is taken to add the MROper into the MRPlan and connect it
145
 * appropriately.
146
 * appropriately.
146
 * 

   
147
 *
147
 *

    
   
148
 *
148
 */
149
 */
149
public class MRCompiler extends PhyPlanVisitor {
150
public class MRCompiler extends PhyPlanVisitor {
150
    PigContext pigContext;
151
    PigContext pigContext;
151
    
152

   
152
    //The plan that is being compiled
153
    //The plan that is being compiled
153
    PhysicalPlan plan;
154
    PhysicalPlan plan;
154

    
   
155

   
155
    //The plan of MapReduce Operators
156
    //The plan of MapReduce Operators
156
    MROperPlan MRPlan;
157
    MROperPlan MRPlan;
157
    
158

   
158
    //The current MapReduce Operator
159
    //The current MapReduce Operator
159
    //that is being compiled
160
    //that is being compiled
160
    MapReduceOper curMROp;
161
    MapReduceOper curMROp;
161
    
162

   
162
    //The output of compiling the inputs
163
    //The output of compiling the inputs
163
    MapReduceOper[] compiledInputs = null;
164
    MapReduceOper[] compiledInputs = null;
164

    
   
165

   
165
    //The split operators seen till now. If not
166
    //The split operators seen till now. If not
166
    //maintained they will haunt you.
167
    //maintained they will haunt you.
167
    //During the traversal a split is the only
168
    //During the traversal a split is the only
168
    //operator that can be revisited from a different
169
    //operator that can be revisited from a different
169
    //path. So this map stores the split job. So 
170
    //path. So this map stores the split job. So
170
    //whenever we hit the split, we create a new MROper
171
    //whenever we hit the split, we create a new MROper
171
    //and connect the split job using load-store and also
172
    //and connect the split job using load-store and also
172
    //in the MRPlan
173
    //in the MRPlan
173
    Map<OperatorKey, MapReduceOper> splitsSeen;
174
    Map<OperatorKey, MapReduceOper> splitsSeen;
174
    
175

   
175
    NodeIdGenerator nig;
176
    NodeIdGenerator nig;
176

    
   
177

   
177
    private String scope;
178
    private String scope;
178
    
179

   
179
    private Random r;
180
    private Random r;
180
    
181

   
181
    private UDFFinder udfFinder;
182
    private UDFFinder udfFinder;
182
    
183

   
183
    private CompilationMessageCollector messageCollector = null;
184
    private CompilationMessageCollector messageCollector = null;
184
    
185

   
185
    private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
186
    private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
186
        
187

   
187
    public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
188
    public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
188
   
189

   
189
    private static final Log LOG = LogFactory.getLog(MRCompiler.class);
190
    private static final Log LOG = LogFactory.getLog(MRCompiler.class);
190
    
191

   
191
    public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
192
    public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
192
    public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
193
    public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
193
    
194

   
194
    private int fileConcatenationThreshold = 100;
195
    private int fileConcatenationThreshold = 100;
195
    private boolean optimisticFileConcatenation = false;
196
    private boolean optimisticFileConcatenation = false;
196
    
197

   
197
    public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
198
    public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
198
        this(plan,null);
199
        this(plan,null);
199
    }
200
    }
200
    
201

   
201
    public MRCompiler(PhysicalPlan plan,
202
    public MRCompiler(PhysicalPlan plan,
202
            PigContext pigContext) throws MRCompilerException {
203
            PigContext pigContext) throws MRCompilerException {
203
        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
204
        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
204
        this.plan = plan;
205
        this.plan = plan;
205
        this.pigContext = pigContext;
206
        this.pigContext = pigContext;
[+20] [20] 10 lines
[+20] public class MRCompiler extends PhyPlanVisitor {
216
        	throw new MRCompilerException(msg, errCode, PigException.BUG);
217
        	throw new MRCompilerException(msg, errCode, PigException.BUG);
217
        }
218
        }
218
        scope = roots.get(0).getOperatorKey().getScope();
219
        scope = roots.get(0).getOperatorKey().getScope();
219
        messageCollector = new CompilationMessageCollector() ;
220
        messageCollector = new CompilationMessageCollector() ;
220
        phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
221
        phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
221
        
222

   
222
        fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties()
223
        fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties()
223
                .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
224
                .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
224
        optimisticFileConcatenation = pigContext.getProperties().getProperty(
225
        optimisticFileConcatenation = pigContext.getProperties().getProperty(
225
                OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
226
                OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
226
        LOG.info("File concatenation threshold: " + fileConcatenationThreshold
227
        LOG.info("File concatenation threshold: " + fileConcatenationThreshold
227
                + " optimistic? " + optimisticFileConcatenation);
228
                + " optimistic? " + optimisticFileConcatenation);
228
    }
229
    }
229
    
230

   
230
    public void aggregateScalarsFiles() throws PlanException, IOException {
231
    public void aggregateScalarsFiles() throws PlanException, IOException {
231
        List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>();
232
        List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>();
232
        for(MapReduceOper mrOp: MRPlan) {
233
        for(MapReduceOper mrOp: MRPlan) {
233
            mrOpList.add(mrOp);
234
            mrOpList.add(mrOp);
234
        }
235
        }
235
        
236

   
236
        Configuration conf = 
237
        Configuration conf =
237
            ConfigurationUtil.toConfiguration(pigContext.getProperties());
238
            ConfigurationUtil.toConfiguration(pigContext.getProperties());
238
        boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
239
        boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
239
        
240

   
240
        Map<FileSpec, MapReduceOper> seen = new HashMap<FileSpec, MapReduceOper>();
241
        Map<FileSpec, MapReduceOper> seen = new HashMap<FileSpec, MapReduceOper>();
241
        
242

   
242
        for(MapReduceOper mrOp: mrOpList) {
243
        for(MapReduceOper mrOp: mrOpList) {
243
            for(PhysicalOperator scalar: mrOp.scalars) {                
244
            for(PhysicalOperator scalar: mrOp.scalars) {
244
                MapReduceOper mro = phyToMROpMap.get(scalar);
245
                MapReduceOper mro = phyToMROpMap.get(scalar);
245
                if (scalar instanceof POStore) {                                     
246
                if (scalar instanceof POStore) {
246
                    FileSpec oldSpec = ((POStore)scalar).getSFile();
247
                    FileSpec oldSpec = ((POStore)scalar).getSFile();
247
                    MapReduceOper mro2 = seen.get(oldSpec);
248
                    MapReduceOper mro2 = seen.get(oldSpec);
248
                    boolean hasSeen = false;
249
                    boolean hasSeen = false;
249
                    if (mro2 != null) {
250
                    if (mro2 != null) {
250
                        hasSeen = true;
251
                        hasSeen = true;
251
                        mro = mro2;
252
                        mro = mro2;
252
                    }
253
                    }
253
                    if (!hasSeen
254
                    if (!hasSeen
254
                            && combinable
255
                            && combinable
255
                            && (mro.reducePlan.isEmpty() ? hasTooManyInputFiles(mro, conf)
256
                            && (mro.reducePlan.isEmpty() ? hasTooManyInputFiles(mro, conf)
256
                                    : (mro.requestedParallelism >= fileConcatenationThreshold))) {
257
                                    : (mro.requestedParallelism >= fileConcatenationThreshold))) {
257
                        PhysicalPlan pl = mro.reducePlan.isEmpty() ? mro.mapPlan : mro.reducePlan;
258
                        PhysicalPlan pl = mro.reducePlan.isEmpty() ? mro.mapPlan : mro.reducePlan;
258
                        FileSpec newSpec = getTempFileSpec();
259
                        FileSpec newSpec = getTempFileSpec();
259
                        
260

   
260
                        // replace oldSpec in mro with newSpec
261
                        // replace oldSpec in mro with newSpec
261
                        new FindStoreNameVisitor(pl, newSpec, oldSpec).visit();
262
                        new FindStoreNameVisitor(pl, newSpec, oldSpec).visit();
262
                        
263

   
263
                        POStore newSto = getStore();
264
                        POStore newSto = getStore();
264
                        newSto.setSFile(oldSpec);
265
                        newSto.setSFile(oldSpec);
265
                        if (MRPlan.getPredecessors(mrOp)!=null && 
266
                        if (MRPlan.getPredecessors(mrOp)!=null &&
266
                                MRPlan.getPredecessors(mrOp).contains(mro))
267
                                MRPlan.getPredecessors(mrOp).contains(mro))
267
                            MRPlan.disconnect(mro, mrOp);
268
                            MRPlan.disconnect(mro, mrOp);
268
                        MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto); 
269
                        MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto);
269
                        MRPlan.connect(catMROp, mrOp);   
270
                        MRPlan.connect(catMROp, mrOp);
270
                        seen.put(oldSpec, catMROp);
271
                        seen.put(oldSpec, catMROp);
271
                    } else {
272
                    } else {
272
                        if (!hasSeen) seen.put(oldSpec, mro);
273
                        if (!hasSeen) seen.put(oldSpec, mro);
273
                    }
274
                    }
274
                }
275
                }
275
            }
276
            }
276
        }
277
        }
277
    }
278
    }
278
    
279

   
279
    public void randomizeFileLocalizer(){
280
    public void randomizeFileLocalizer(){
280
        FileLocalizer.setR(new Random());
281
        FileLocalizer.setR(new Random());
281
    }
282
    }
282
    
283

   
283
    /**
284
    /**
284
     * Used to get the compiled plan
285
     * Used to get the compiled plan
285
     * @return map reduce plan built by the compiler
286
     * @return map reduce plan built by the compiler
286
     */
287
     */
287
    public MROperPlan getMRPlan() {
288
    public MROperPlan getMRPlan() {
288
        return MRPlan;
289
        return MRPlan;
289
    }
290
    }
290
    
291

   
291
    /**
292
    /**
292
     * Used to get the plan that was compiled
293
     * Used to get the plan that was compiled
293
     * @return physical plan
294
     * @return physical plan
294
     */
295
     */
295
    @Override
296
    @Override
296
    public PhysicalPlan getPlan() {
297
    public PhysicalPlan getPlan() {
297
        return plan;
298
        return plan;
298
    }
299
    }
299
    
300

   
300
    public CompilationMessageCollector getMessageCollector() {
301
    public CompilationMessageCollector getMessageCollector() {
301
    	return messageCollector;
302
    	return messageCollector;
302
    }
303
    }
303
    
304

   
304
    /**
305
    /**
305
     * The front-end method that the user calls to compile
306
     * The front-end method that the user calls to compile
306
     * the plan. Assumes that all submitted plans have a Store
307
     * the plan. Assumes that all submitted plans have a Store
307
     * operators as the leaf.
308
     * operators as the leaf.
308
     * @return A map reduce plan
309
     * @return A map reduce plan
[+20] [20] 26 lines
[+20] [+] public MROperPlan compile() throws IOException, PlanException, VisitorException {
335
            ops = new ArrayList<PhysicalOperator>(leaves.size() + nativeMRs.size());
336
            ops = new ArrayList<PhysicalOperator>(leaves.size() + nativeMRs.size());
336
            ops.addAll(leaves);
337
            ops.addAll(leaves);
337
        }
338
        }
338
        ops.addAll(nativeMRs);
339
        ops.addAll(nativeMRs);
339
        Collections.sort(ops);
340
        Collections.sort(ops);
340
        
341

   
341
        for (PhysicalOperator op : ops) {
342
        for (PhysicalOperator op : ops) {
342
            compile(op);
343
            compile(op);
343
        }
344
        }
344
        
345

   
345
        connectSoftLink();
346
        connectSoftLink();
346
        
347

   
347
        return MRPlan;
348
        return MRPlan;
348
    }
349
    }
349
    
350

   
350
    public void connectSoftLink() throws PlanException, IOException {
351
    public void connectSoftLink() throws PlanException, IOException {
351
        for (PhysicalOperator op : plan) {
352
        for (PhysicalOperator op : plan) {
352
            if (plan.getSoftLinkPredecessors(op)!=null) {
353
            if (plan.getSoftLinkPredecessors(op)!=null) {
353
                for (PhysicalOperator pred : plan.getSoftLinkPredecessors(op)) {
354
                for (PhysicalOperator pred : plan.getSoftLinkPredecessors(op)) {
354
                    MapReduceOper from = phyToMROpMap.get(pred);
355
                    MapReduceOper from = phyToMROpMap.get(pred);
[+20] [20] 5 lines
[+20] public void connectSoftLink() throws PlanException, IOException {
360
                    }
361
                    }
361
                }
362
                }
362
            }
363
            }
363
        }
364
        }
364
    }
365
    }
365
    
366

   
366
    /**
367
    /**
367
     * Compiles the plan below op into a MapReduce Operator
368
     * Compiles the plan below op into a MapReduce Operator
368
     * and stores it in curMROp.
369
     * and stores it in curMROp.
369
     * @param op
370
     * @param op
370
     * @throws IOException
371
     * @throws IOException
371
     * @throws PlanException
372
     * @throws PlanException
372
     * @throws VisitorException
373
     * @throws VisitorException
373
     */
374
     */
374
    private void compile(PhysicalOperator op) throws IOException,
375
    private void compile(PhysicalOperator op) throws IOException,
375
    PlanException, VisitorException {
376
    PlanException, VisitorException {
376
        //An artifact of the Visitor. Need to save
377
        //An artifact of the Visitor. Need to save
377
        //this so that it is not overwritten.
378
        //this so that it is not overwritten.
378
        MapReduceOper[] prevCompInp = compiledInputs;
379
        MapReduceOper[] prevCompInp = compiledInputs;
379
        
380

   
380
        //Compile each predecessor into the MROper and 
381
        //Compile each predecessor into the MROper and
381
        //store them away so that we can use them for compiling
382
        //store them away so that we can use them for compiling
382
        //op.
383
        //op.
383
        List<PhysicalOperator> predecessors = plan.getPredecessors(op);
384
        List<PhysicalOperator> predecessors = plan.getPredecessors(op);
384
        if(op instanceof PONative){
385
        if(op instanceof PONative){
385
            // the predecessor (store) has already been processed
386
            // the predecessor (store) has already been processed
[+20] [20] 17 lines
[+20] private void compile(PhysicalOperator op) throws IOException,
403
                }
404
                }
404

    
   
405

   
405
                PhysicalOperator p = predecessors.get(0);
406
                PhysicalOperator p = predecessors.get(0);
406
                MapReduceOper oper = null;
407
                MapReduceOper oper = null;
407
                if(p instanceof POStore || p instanceof PONative){
408
                if(p instanceof POStore || p instanceof PONative){
408
                    oper = phyToMROpMap.get(p); 
409
                    oper = phyToMROpMap.get(p);
409
                }else{
410
                }else{
410
                    int errCode = 2126;
411
                    int errCode = 2126;
411
                    String msg = "Predecessor of load should be a store or mapreduce operator. Got "+p.getClass();
412
                    String msg = "Predecessor of load should be a store or mapreduce operator. Got "+p.getClass();
412
                    throw new PlanException(msg, errCode, PigException.BUG);
413
                    throw new PlanException(msg, errCode, PigException.BUG);
413
                }
414
                }
[+20] [20] 6 lines
[+20] private void compile(PhysicalOperator op) throws IOException,
420
                plan.disconnect(op, p);
421
                plan.disconnect(op, p);
421
                MRPlan.connect(oper, curMROp);
422
                MRPlan.connect(oper, curMROp);
422
                phyToMROpMap.put(op, curMROp);
423
                phyToMROpMap.put(op, curMROp);
423
                return;
424
                return;
424
            }
425
            }
425
            
426

   
426
            Collections.sort(predecessors);
427
            Collections.sort(predecessors);
427
            compiledInputs = new MapReduceOper[predecessors.size()];
428
            compiledInputs = new MapReduceOper[predecessors.size()];
428
            int i = -1;
429
            int i = -1;
429
            for (PhysicalOperator pred : predecessors) {
430
            for (PhysicalOperator pred : predecessors) {
430
                if(pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())){
431
                if(pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())){
[+20] [20] 16 lines
[+20] private void compile(PhysicalOperator op) throws IOException,
447
            }
448
            }
448
            MRPlan.add(curMROp);
449
            MRPlan.add(curMROp);
449
            phyToMROpMap.put(op, curMROp);
450
            phyToMROpMap.put(op, curMROp);
450
            return;
451
            return;
451
        }
452
        }
452
        
453

   
453
        //Now we have the inputs compiled. Do something
454
        //Now we have the inputs compiled. Do something
454
        //with the input oper op.
455
        //with the input oper op.
455
        op.visit(this);
456
        op.visit(this);
456
        if(op.getRequestedParallelism() > curMROp.requestedParallelism ) {
457
        if(op.getRequestedParallelism() > curMROp.requestedParallelism ) {
457
        	// we don't want to change prallelism for skewed join due to sampling
458
        	// we don't want to change prallelism for skewed join due to sampling
458
        	// and pre-allocated reducers for skewed keys
459
        	// and pre-allocated reducers for skewed keys
459
        	if (!curMROp.isSkewedJoin()) {
460
        	if (!curMROp.isSkewedJoin()) {
460
        		curMROp.requestedParallelism = op.getRequestedParallelism();
461
        		curMROp.requestedParallelism = op.getRequestedParallelism();
461
        	}
462
        	}
462
        }
463
        }
463
        compiledInputs = prevCompInp;
464
        compiledInputs = prevCompInp;
464
    }
465
    }
465
    
466

   
466
    private MapReduceOper getMROp(){
467
    private MapReduceOper getMROp(){
467
        return new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
468
        return new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
468
    }
469
    }
469
    
470

   
470
    private NativeMapReduceOper getNativeMROp(String mrJar, String[] parameters) {
471
    private NativeMapReduceOper getNativeMROp(String mrJar, String[] parameters) {
471
        return new NativeMapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)), mrJar, parameters);
472
        return new NativeMapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)), mrJar, parameters);
472
    }
473
    }
473
    
474

   
474
    private POLoad getLoad(){
475
    private POLoad getLoad(){
475
        POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
476
        POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
476
        ld.setPc(pigContext);
477
        ld.setPc(pigContext);
477
        return ld;
478
        return ld;
478
    }
479
    }
479
    
480

   
480
    private POStore getStore(){
481
    private POStore getStore(){
481
        POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
482
        POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
482
        // mark store as tmp store. These could be removed by the
483
        // mark store as tmp store. These could be removed by the
483
        // optimizer, because it wasn't the user requesting it.
484
        // optimizer, because it wasn't the user requesting it.
484
        st.setIsTmpStore(true);
485
        st.setIsTmpStore(true);
485
        return st;
486
        return st;
486
    }
487
    }
487
    
488

   
488
    /**
489
    /**
489
     * A map MROper is an MROper whose map plan is still open
490
     * A map MROper is an MROper whose map plan is still open
490
     * for taking more non-blocking operators.
491
     * for taking more non-blocking operators.
491
     * A reduce MROper is an MROper whose map plan is done but
492
     * A reduce MROper is an MROper whose map plan is done but
492
     * the reduce plan is open for taking more non-blocking opers.
493
     * the reduce plan is open for taking more non-blocking opers.
493
     * 
494
     *
494
     * Used for compiling non-blocking operators. The logic here
495
     * Used for compiling non-blocking operators. The logic here
495
     * is simple. If there is a single input, just push the operator
496
     * is simple. If there is a single input, just push the operator
496
     * into whichever phase is open. Otherwise, we merge the compiled
497
     * into whichever phase is open. Otherwise, we merge the compiled
497
     * inputs into a list of MROpers where the first oper is the merged
498
     * inputs into a list of MROpers where the first oper is the merged
498
     * oper consisting of all map MROpers and the rest are reduce MROpers
499
     * oper consisting of all map MROpers and the rest are reduce MROpers
499
     * as reduce plans can't be merged.
500
     * as reduce plans can't be merged.
500
     * Then we add the input oper op into the merged map MROper's map plan
501
     * Then we add the input oper op into the merged map MROper's map plan
501
     * as a leaf and connect the reduce MROpers using store-load combinations
502
     * as a leaf and connect the reduce MROpers using store-load combinations
502
     * to the input operator which is the leaf. Also care is taken to 
503
     * to the input operator which is the leaf. Also care is taken to
503
     * connect the MROpers according to the dependencies.
504
     * connect the MROpers according to the dependencies.
504
     * @param op
505
     * @param op
505
     * @throws PlanException
506
     * @throws PlanException
506
     * @throws IOException
507
     * @throws IOException
507
     */
508
     */
508
    private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
509
    private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
509
        
510

   
510
        if (compiledInputs.length == 1) {
511
        if (compiledInputs.length == 1) {
511
            //For speed
512
            //For speed
512
            MapReduceOper mro = compiledInputs[0];
513
            MapReduceOper mro = compiledInputs[0];
513
            if (!mro.isMapDone()) {
514
            if (!mro.isMapDone()) {
514
                mro.mapPlan.addAsLeaf(op);
515
                mro.mapPlan.addAsLeaf(op);
515
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
516
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
516
                mro.reducePlan.addAsLeaf(op);
517
                mro.reducePlan.addAsLeaf(op);
517
            } else {
518
            } else {
518
                int errCode = 2022;
519
                int errCode = 2022;
519
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";                
520
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
520
                throw new PlanException(msg, errCode, PigException.BUG);
521
                throw new PlanException(msg, errCode, PigException.BUG);
521
            }
522
            }
522
            curMROp = mro;
523
            curMROp = mro;
523
        } else {
524
        } else {
524
            List<MapReduceOper> mergedPlans = merge(compiledInputs);
525
            List<MapReduceOper> mergedPlans = merge(compiledInputs);
525
            
526

   
526
            //The first MROper is always the merged map MROper
527
            //The first MROper is always the merged map MROper
527
            MapReduceOper mro = mergedPlans.remove(0);
528
            MapReduceOper mro = mergedPlans.remove(0);
528
            //Push the input operator into the merged map MROper
529
            //Push the input operator into the merged map MROper
529
            mro.mapPlan.addAsLeaf(op);
530
            mro.mapPlan.addAsLeaf(op);
530
            
531

   
531
            //Connect all the reduce MROpers
532
            //Connect all the reduce MROpers
532
            if(mergedPlans.size()>0)
533
            if(mergedPlans.size()>0)
533
                connRedOper(mergedPlans, mro);
534
                connRedOper(mergedPlans, mro);
534
            
535

   
535
            //return the compiled MROper
536
            //return the compiled MROper
536
            curMROp = mro;
537
            curMROp = mro;
537
        }
538
        }
538
    }
539
    }
539
    
540

   
540
    private void addToMap(PhysicalOperator op) throws PlanException, IOException{
541
    private void addToMap(PhysicalOperator op) throws PlanException, IOException{
541
        
542

   
542
        if (compiledInputs.length == 1) {
543
        if (compiledInputs.length == 1) {
543
            //For speed
544
            //For speed
544
            MapReduceOper mro = compiledInputs[0];
545
            MapReduceOper mro = compiledInputs[0];
545
            if (!mro.isMapDone()) {
546
            if (!mro.isMapDone()) {
546
                mro.mapPlan.addAsLeaf(op);
547
                mro.mapPlan.addAsLeaf(op);
547
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
548
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
548
                FileSpec fSpec = getTempFileSpec();
549
                FileSpec fSpec = getTempFileSpec();
549
                
550

   
550
                POStore st = getStore();
551
                POStore st = getStore();
551
                st.setSFile(fSpec);
552
                st.setSFile(fSpec);
552
                mro.reducePlan.addAsLeaf(st);
553
                mro.reducePlan.addAsLeaf(st);
553
                mro.setReduceDone(true);
554
                mro.setReduceDone(true);
554
                mro = startNew(fSpec, mro);
555
                mro = startNew(fSpec, mro);
555
                mro.mapPlan.addAsLeaf(op);
556
                mro.mapPlan.addAsLeaf(op);
556
                compiledInputs[0] = mro;
557
                compiledInputs[0] = mro;
557
            } else {
558
            } else {
558
                int errCode = 2022;
559
                int errCode = 2022;
559
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";                
560
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
560
                throw new PlanException(msg, errCode, PigException.BUG);
561
                throw new PlanException(msg, errCode, PigException.BUG);
561
            }
562
            }
562
            curMROp = mro;
563
            curMROp = mro;
563
        } else {
564
        } else {
564
            List<MapReduceOper> mergedPlans = merge(compiledInputs);
565
            List<MapReduceOper> mergedPlans = merge(compiledInputs);
565
            
566

   
566
            //The first MROper is always the merged map MROper
567
            //The first MROper is always the merged map MROper
567
            MapReduceOper mro = mergedPlans.remove(0);
568
            MapReduceOper mro = mergedPlans.remove(0);
568
            //Push the input operator into the merged map MROper
569
            //Push the input operator into the merged map MROper
569
            mro.mapPlan.addAsLeaf(op);
570
            mro.mapPlan.addAsLeaf(op);
570
            
571

   
571
            //Connect all the reduce MROpers
572
            //Connect all the reduce MROpers
572
            if(mergedPlans.size()>0)
573
            if(mergedPlans.size()>0)
573
                connRedOper(mergedPlans, mro);
574
                connRedOper(mergedPlans, mro);
574
            
575

   
575
            //return the compiled MROper
576
            //return the compiled MROper
576
            curMROp = mro;
577
            curMROp = mro;
577
        }
578
        }
578
    }
579
    }
579
    
580

   
580
    /**
581
    /**
581
     * Used for compiling blocking operators. If there is a single input
582
     * Used for compiling blocking operators. If there is a single input
582
     * and its map phase is still open, then close it so that further
583
     * and its map phase is still open, then close it so that further
583
     * operators can be compiled into the reduce phase. If its reduce phase
584
     * operators can be compiled into the reduce phase. If its reduce phase
584
     * is open, add a store and close it. Start a new map MROper into which
585
     * is open, add a store and close it. Start a new map MROper into which
585
     * further operators can be compiled into. 
586
     * further operators can be compiled into.
586
     * 
587
     *
587
     * If there are multiple inputs, the logic 
588
     * If there are multiple inputs, the logic
588
     * is to merge all map MROpers into one map MROper and retain
589
     * is to merge all map MROpers into one map MROper and retain
589
     * the reduce MROpers. Since the operator is blocking, it has
590
     * the reduce MROpers. Since the operator is blocking, it has
590
     * to be a Global Rerrange at least now. This operator need not
591
     * to be a Global Rerrange at least now. This operator need not
591
     * be inserted into our plan as it is implemented by hadoop.
592
     * be inserted into our plan as it is implemented by hadoop.
592
     * But this creates the map-reduce boundary. So the merged map MROper
593
     * But this creates the map-reduce boundary. So the merged map MROper
593
     * is closed and its reduce phase is started. Depending on the number
594
     * is closed and its reduce phase is started. Depending on the number
594
     * of reduce MROpers and the number of pipelines in the map MRoper
595
     * of reduce MROpers and the number of pipelines in the map MRoper
595
     * a Union operator is inserted whenever necessary. This also leads to the 
596
     * a Union operator is inserted whenever necessary. This also leads to the
596
     * possibility of empty map plans. So have to be careful while handling
597
     * possibility of empty map plans. So have to be careful while handling
597
     * it in the PigMapReduce class. If there are no map
598
     * it in the PigMapReduce class. If there are no map
598
     * plans, then a new one is created as a side effect of the merge
599
     * plans, then a new one is created as a side effect of the merge
599
     * process. If there are no reduce MROpers, and only a single pipeline
600
     * process. If there are no reduce MROpers, and only a single pipeline
600
     * in the map, then no union oper is added. Otherwise a Union oper is 
601
     * in the map, then no union oper is added. Otherwise a Union oper is
601
     * added to the merged map MROper to which all the reduce MROpers 
602
     * added to the merged map MROper to which all the reduce MROpers
602
     * are connected by store-load combinations. Care is taken
603
     * are connected by store-load combinations. Care is taken
603
     * to connect the MROpers in the MRPlan.  
604
     * to connect the MROpers in the MRPlan.
604
     * @param op
605
     * @param op
605
     * @throws IOException
606
     * @throws IOException
606
     * @throws PlanException
607
     * @throws PlanException
607
     */
608
     */
608
    private void blocking(PhysicalOperator op) throws IOException, PlanException{
609
    private void blocking(PhysicalOperator op) throws IOException, PlanException{
609
        if(compiledInputs.length==1){
610
        if(compiledInputs.length==1){
610
            MapReduceOper mro = compiledInputs[0];
611
            MapReduceOper mro = compiledInputs[0];
611
            if (!mro.isMapDone()) {
612
            if (!mro.isMapDone()) {
612
                mro.setMapDoneSingle(true);
613
                mro.setMapDoneSingle(true);
613
                curMROp = mro;
614
                curMROp = mro;
614
            }
615
            }
615
            else if(mro.isMapDone() && !mro.isReduceDone()){
616
            else if(mro.isMapDone() && !mro.isReduceDone()){
616
                FileSpec fSpec = getTempFileSpec();
617
                FileSpec fSpec = getTempFileSpec();
617
                
618

   
618
                POStore st = getStore();
619
                POStore st = getStore();
619
                st.setSFile(fSpec);
620
                st.setSFile(fSpec);
620
                mro.reducePlan.addAsLeaf(st);
621
                mro.reducePlan.addAsLeaf(st);
621
                mro.setReduceDone(true);
622
                mro.setReduceDone(true);
622
                curMROp = startNew(fSpec, mro);
623
                curMROp = startNew(fSpec, mro);
623
                curMROp.setMapDone(true);
624
                curMROp.setMapDone(true);
624
            }
625
            }
625
        }
626
        }
626
        else{
627
        else{
627
            List<MapReduceOper> mergedPlans = merge(compiledInputs);
628
            List<MapReduceOper> mergedPlans = merge(compiledInputs);
628
            MapReduceOper mro = mergedPlans.remove(0);
629
            MapReduceOper mro = mergedPlans.remove(0);
629
            
630

   
630
            if(mergedPlans.size()>0)
631
            if(mergedPlans.size()>0)
631
                mro.setMapDoneMultiple(true);
632
                mro.setMapDoneMultiple(true);
632
            else
633
            else
633
                mro.setMapDoneSingle(true);
634
                mro.setMapDoneSingle(true);
634

    
   
635

   
635
            // Connect all the reduce MROpers
636
            // Connect all the reduce MROpers
636
            if(mergedPlans.size()>0)
637
            if(mergedPlans.size()>0)
637
                connRedOper(mergedPlans, mro);
638
                connRedOper(mergedPlans, mro);
638
            curMROp = mro;
639
            curMROp = mro;
639
        }
640
        }
640
    }
641
    }
641
    
642

   
642
    /**
643
    /**
643
     * Connect the reduce MROpers to the leaf node in the map MROper mro
644
     * Connect the reduce MROpers to the leaf node in the map MROper mro
644
     * by adding appropriate loads
645
     * by adding appropriate loads
645
     * @param mergedPlans - The list of reduce MROpers
646
     * @param mergedPlans - The list of reduce MROpers
646
     * @param mro - The map MROper
647
     * @param mro - The map MROper
647
     * @throws PlanException 
648
     * @throws PlanException
648
     * @throws IOException
649
     * @throws IOException
649
     */
650
     */
650
    private void connRedOper(List<MapReduceOper> mergedPlans, MapReduceOper mro) throws PlanException, IOException{
651
    private void connRedOper(List<MapReduceOper> mergedPlans, MapReduceOper mro) throws PlanException, IOException{
651
        PhysicalOperator leaf = null;
652
        PhysicalOperator leaf = null;
652
        List<PhysicalOperator> leaves = mro.mapPlan.getLeaves();
653
        List<PhysicalOperator> leaves = mro.mapPlan.getLeaves();
[+20] [20] 12 lines
[+20] private void connRedOper(List<MapReduceOper> mergedPlans, MapReduceOper mro) throws PlanException, IOException{
665
            if(leaf!=null)
666
            if(leaf!=null)
666
                mro.mapPlan.connect(ld, leaf);
667
                mro.mapPlan.connect(ld, leaf);
667
            MRPlan.connect(mmro, mro);
668
            MRPlan.connect(mmro, mro);
668
        }
669
        }
669
    }
670
    }
670
    
671

   
671
    
672

   
672
    /**
673
    /**
673
     * Force an end to the current map reduce job with a store into a temporary
674
     * Force an end to the current map reduce job with a store into a temporary
674
     * file.
675
     * file.
675
     * @param fSpec Temp file to force a store into.
676
     * @param fSpec Temp file to force a store into.
676
     * @return MR operator that now is finished with a store.
677
     * @return MR operator that now is finished with a store.
[+20] [20] 19 lines
[+20] [+] private MapReduceOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{
696
            String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
697
            String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
697
            throw new PlanException(msg, errCode, PigException.BUG);
698
            throw new PlanException(msg, errCode, PigException.BUG);
698
        }
699
        }
699
        return mro;
700
        return mro;
700
    }
701
    }
701
    
702

   
702
    /**
703
    /**
703
     * Starts a new MRoper and connects it to the old
704
     * Starts a new MRoper and connects it to the old
704
     * one by load-store. The assumption is that the 
705
     * one by load-store. The assumption is that the
705
     * store is already inserted into the old MROper.
706
     * store is already inserted into the old MROper.
706
     * @param fSpec
707
     * @param fSpec
707
     * @param old
708
     * @param old
708
     * @return
709
     * @return
709
     * @throws IOException
710
     * @throws IOException
710
     * @throws PlanException 
711
     * @throws PlanException
711
     */
712
     */
712
    private MapReduceOper startNew(FileSpec fSpec, MapReduceOper old) throws PlanException{
713
    private MapReduceOper startNew(FileSpec fSpec, MapReduceOper old) throws PlanException{
713
        POLoad ld = getLoad();
714
        POLoad ld = getLoad();
714
        ld.setLFile(fSpec);
715
        ld.setLFile(fSpec);
715
        MapReduceOper ret = getMROp();
716
        MapReduceOper ret = getMROp();
716
        ret.mapPlan.add(ld);
717
        ret.mapPlan.add(ld);
717
        MRPlan.add(ret);
718
        MRPlan.add(ret);
718
        MRPlan.connect(old, ret);
719
        MRPlan.connect(old, ret);
719
        return ret;
720
        return ret;
720
    }
721
    }
721
 
722

   
722
    /**
723
    /**
723
     * Returns a temporary DFS Path
724
     * Returns a temporary DFS Path
724
     * @return
725
     * @return
725
     * @throws IOException
726
     * @throws IOException
726
     */
727
     */
727
    private FileSpec getTempFileSpec() throws IOException {
728
    private FileSpec getTempFileSpec() throws IOException {
728
        return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
729
        return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
729
                new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
730
                new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
730
    }
731
    }
731
    
732

   
732
    /**
733
    /**
733
     * Merges the map MROpers in the compiledInputs into a single
734
     * Merges the map MROpers in the compiledInputs into a single
734
     * merged map MRoper and returns a List with the merged map MROper
735
     * merged map MRoper and returns a List with the merged map MROper
735
     * as the first oper and the rest being reduce MROpers.
736
     * as the first oper and the rest being reduce MROpers.
736
     * 
737
     *
737
     * Care is taken to remove the map MROpers that are merged from the
738
     * Care is taken to remove the map MROpers that are merged from the
738
     * MRPlan and their connections moved over to the merged map MROper.
739
     * MRPlan and their connections moved over to the merged map MROper.
739
     * 
740
     *
740
     * Merge is implemented as a sequence of binary merges.
741
     * Merge is implemented as a sequence of binary merges.
741
     * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst 
742
     * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst
742
     *   
743
     *
743
     * @param compiledInputs
744
     * @param compiledInputs
744
     * @return
745
     * @return
745
     * @throws PlanException
746
     * @throws PlanException
746
     * @throws IOException
747
     * @throws IOException
747
     */
748
     */
748
    private List<MapReduceOper> merge(MapReduceOper[] compiledInputs)
749
    private List<MapReduceOper> merge(MapReduceOper[] compiledInputs)
749
            throws PlanException {
750
            throws PlanException {
750
        List<MapReduceOper> ret = new ArrayList<MapReduceOper>();
751
        List<MapReduceOper> ret = new ArrayList<MapReduceOper>();
751
        
752

   
752
        MapReduceOper mergedMap = getMROp();
753
        MapReduceOper mergedMap = getMROp();
753
        ret.add(mergedMap);
754
        ret.add(mergedMap);
754
        MRPlan.add(mergedMap);
755
        MRPlan.add(mergedMap);
755
        
756

   
756
        Set<MapReduceOper> toBeConnected = new HashSet<MapReduceOper>();
757
        Set<MapReduceOper> toBeConnected = new HashSet<MapReduceOper>();
757
        List<MapReduceOper> remLst = new ArrayList<MapReduceOper>();
758
        List<MapReduceOper> remLst = new ArrayList<MapReduceOper>();
758

    
   
759

   
759
        List<PhysicalPlan> mpLst = new ArrayList<PhysicalPlan>();
760
        List<PhysicalPlan> mpLst = new ArrayList<PhysicalPlan>();
760

    
   
761

   
[+20] [20] 8 lines
[+20] private FileSpec getTempFileSpec() throws IOException {
769
                }
770
                }
770
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
771
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
771
                ret.add(mro);
772
                ret.add(mro);
772
            } else {
773
            } else {
773
                int errCode = 2027;
774
                int errCode = 2027;
774
                String msg = "Both map and reduce phases have been done. This is unexpected for a merge."; 
775
                String msg = "Both map and reduce phases have been done. This is unexpected for a merge.";
775
                throw new PlanException(msg, errCode, PigException.BUG);
776
                throw new PlanException(msg, errCode, PigException.BUG);
776
            }
777
            }
777
        }
778
        }
778
        merge(ret.get(0).mapPlan, mpLst);
779
        merge(ret.get(0).mapPlan, mpLst);
779
        
780

   
780
        Iterator<MapReduceOper> it = toBeConnected.iterator();
781
        Iterator<MapReduceOper> it = toBeConnected.iterator();
781
        while(it.hasNext())
782
        while(it.hasNext())
782
            MRPlan.connect(it.next(), mergedMap);
783
            MRPlan.connect(it.next(), mergedMap);
783
        for(MapReduceOper rmro : remLst){
784
        for(MapReduceOper rmro : remLst){
784
            if(rmro.requestedParallelism > mergedMap.requestedParallelism)
785
            if(rmro.requestedParallelism > mergedMap.requestedParallelism)
[+20] [20] 16 lines
[+20] private FileSpec getTempFileSpec() throws IOException {
801
                }
802
                }
802
            }
803
            }
803
            for (PhysicalOperator op : opsToChange) {
804
            for (PhysicalOperator op : opsToChange) {
804
                phyToMROpMap.put(op, mergedMap);
805
                phyToMROpMap.put(op, mergedMap);
805
            }
806
            }
806
            
807

   
807
            MRPlan.remove(rmro);
808
            MRPlan.remove(rmro);
808
        }
809
        }
809
        return ret;
810
        return ret;
810
    }
811
    }
811
    
812

   
812
    /**
813
    /**
813
     * The merge of a list of map plans
814
     * The merge of a list of map plans
814
     * @param <O>
815
     * @param <O>
815
     * @param <E>
816
     * @param <E>
816
     * @param finPlan - Final Plan into which the list of plans is merged
817
     * @param finPlan - Final Plan into which the list of plans is merged
[+20] [20] 11 lines
[+20] private FileSpec getTempFileSpec() throws IOException {
828
        if(plan!=null){
829
        if(plan!=null){
829
            //Process Scalars (UDF with referencedOperators)
830
            //Process Scalars (UDF with referencedOperators)
830
            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
831
            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
831
            scalarPhyFinder.visit();
832
            scalarPhyFinder.visit();
832
            curMROp.scalars.addAll(scalarPhyFinder.getScalars());
833
            curMROp.scalars.addAll(scalarPhyFinder.getScalars());
833
            
834

   
834
            //Process UDFs
835
            //Process UDFs
835
            udfFinder.setPlan(plan);
836
            udfFinder.setPlan(plan);
836
            udfFinder.visit();
837
            udfFinder.visit();
837
            curMROp.UDFs.addAll(udfFinder.getUDFs());
838
            curMROp.UDFs.addAll(udfFinder.getUDFs());
838
        }
839
        }
839
    }
840
    }
840
    
841

   
841
    
842

   
842
    /* The visitOp methods that decide what to do with the current operator */
843
    /* The visitOp methods that decide what to do with the current operator */
843
    
844

   
844
    /**
845
    /**
845
     * Compiles a split operator. The logic is to
846
     * Compiles a split operator. The logic is to
846
     * close the split job by replacing the split oper by
847
     * close the split job by replacing the split oper by
847
     * a store and creating a new Map MRoper and return
848
     * a store and creating a new Map MRoper and return
848
     * that as the current MROper to which other operators
849
     * that as the current MROper to which other operators
849
     * would be compiled into. The new MROper would be connected
850
     * would be compiled into. The new MROper would be connected
850
     * to the split job by load-store. Also add the split oper 
851
     * to the split job by load-store. Also add the split oper
851
     * to the splitsSeen map.
852
     * to the splitsSeen map.
852
     * @param op - The split operator
853
     * @param op - The split operator
853
     * @throws VisitorException
854
     * @throws VisitorException
854
     */
855
     */
855
    @Override
856
    @Override
[+20] [20] 9 lines
[+20] [+] public void visitSplit(POSplit op) throws VisitorException{
865
            int errCode = 2034;
866
            int errCode = 2034;
866
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
867
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
867
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
868
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
868
        }
869
        }
869
    }
870
    }
870
    
871

   
871
    @Override
872
    @Override
872
    public void visitLoad(POLoad op) throws VisitorException{
873
    public void visitLoad(POLoad op) throws VisitorException{
873
        try{
874
        try{
874
            nonBlocking(op);
875
            nonBlocking(op);
875
            phyToMROpMap.put(op, curMROp);
876
            phyToMROpMap.put(op, curMROp);
876
        }catch(Exception e){
877
        }catch(Exception e){
877
            int errCode = 2034;
878
            int errCode = 2034;
878
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
879
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
879
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
880
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
880
        }
881
        }
881
    }
882
    }
882
    
883

   
883
    @Override
884
    @Override
884
    public void visitNative(PONative op) throws VisitorException{
885
    public void visitNative(PONative op) throws VisitorException{
885
        // We will explode the native operator here to add a new MROper for native Mapreduce job
886
        // We will explode the native operator here to add a new MROper for native Mapreduce job
886
        try{
887
        try{
887
            // add a map reduce boundary
888
            // add a map reduce boundary
[+20] [20] 6 lines
[+20] public void visitNative(PONative op) throws VisitorException{
894
            int errCode = 2034;
895
            int errCode = 2034;
895
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
896
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
896
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
897
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
897
        }
898
        }
898
    }
899
    }
899
    
900

   
900
    @Override
901
    @Override
901
    public void visitStore(POStore op) throws VisitorException{
902
    public void visitStore(POStore op) throws VisitorException{
902
        try{
903
        try{
903
            nonBlocking(op);
904
            nonBlocking(op);
904
            phyToMROpMap.put(op, curMROp);
905
            phyToMROpMap.put(op, curMROp);
905
            if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)
906
            if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)
906
                curMROp.UDFs.add(op.getSFile().getFuncSpec().toString());
907
                curMROp.UDFs.add(op.getSFile().getFuncSpec().toString());
907
        }catch(Exception e){
908
        }catch(Exception e){
908
            int errCode = 2034;
909
            int errCode = 2034;
909
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
910
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
910
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
911
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
911
        }
912
        }
912
    }
913
    }
913
    
914

   
914
    @Override
915
    @Override
915
    public void visitFilter(POFilter op) throws VisitorException{
916
    public void visitFilter(POFilter op) throws VisitorException{
916
        try{
917
        try{
917
            nonBlocking(op);
918
            nonBlocking(op);
918
            processUDFs(op.getPlan());
919
            processUDFs(op.getPlan());
919
            phyToMROpMap.put(op, curMROp);
920
            phyToMROpMap.put(op, curMROp);
920
        }catch(Exception e){
921
        }catch(Exception e){
921
            int errCode = 2034;
922
            int errCode = 2034;
922
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
923
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
923
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
924
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
924
        }
925
        }
925
    }
926
    }
926
    
927

   
927
    @Override
928
    @Override
928
    public void visitCross(POCross op) throws VisitorException {
929
    public void visitCross(POCross op) throws VisitorException {
929
        try{
930
        try{
930
            nonBlocking(op);
931
            nonBlocking(op);
931
            phyToMROpMap.put(op, curMROp);
932
            phyToMROpMap.put(op, curMROp);
[+20] [20] 13 lines
[+20] [+] public void visitStream(POStream op) throws VisitorException{
945
            int errCode = 2034;
946
            int errCode = 2034;
946
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
947
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
947
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
948
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
948
        }
949
        }
949
    }
950
    }
950
    
951

   
951
    @Override
952
    @Override
952
    public void visitLimit(POLimit op) throws VisitorException{
953
    public void visitLimit(POLimit op) throws VisitorException{
953
        try{
954
        try{
954
            MapReduceOper mro = compiledInputs[0];
955
            MapReduceOper mro = compiledInputs[0];
955
            mro.limit = op.getLimit();
956
            mro.limit = op.getLimit();
[+20] [20] 7 lines
[+20] public void visitLimit(POLimit op) throws VisitorException{
963
                if (!pigContext.inIllustrator)
964
                if (!pigContext.inIllustrator)
964
                {
965
                {
965
                    mro.mapPlan.addAsLeaf(op);
966
                    mro.mapPlan.addAsLeaf(op);
966
                    mro.setMapDone(true);
967
                    mro.setMapDone(true);
967
                }
968
                }
968
                
969

   
969
                if (mro.reducePlan.isEmpty())
970
                if (mro.reducePlan.isEmpty())
970
                {
971
                {
971
                    MRUtil.simpleConnectMapToReduce(mro, scope, nig);
972
                    MRUtil.simpleConnectMapToReduce(mro, scope, nig);
972
                    mro.requestedParallelism = 1;
973
                    mro.requestedParallelism = 1;
973
                    if (!pigContext.inIllustrator) {
974
                    if (!pigContext.inIllustrator) {
[+20] [20] 5 lines
[+20] public void visitLimit(POLimit op) throws VisitorException{
979
                        mro.reducePlan.addAsLeaf(op);
980
                        mro.reducePlan.addAsLeaf(op);
980
                    }
981
                    }
981
                }
982
                }
982
                else
983
                else
983
                {
984
                {
984
                    messageCollector.collect("Something in the reduce plan while map plan is not done. Something wrong!", 
985
                    messageCollector.collect("Something in the reduce plan while map plan is not done. Something wrong!",
985
                    		MessageType.Warning, PigWarning.REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS);
986
                    		MessageType.Warning, PigWarning.REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS);
986
                }
987
                }
987
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
988
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
988
            	// limit should add into reduce plan
989
            	// limit should add into reduce plan
989
                mro.reducePlan.addAsLeaf(op);
990
                mro.reducePlan.addAsLeaf(op);
[+20] [20] 25 lines
[+20] [+] public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
1015
        }
1016
        }
1016
    }
1017
    }
1017

    
   
1018

   
1018
    @Override
1019
    @Override
1019
    public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
1020
    public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
1020
        
1021

   
1021
        if(!curMROp.mapDone){
1022
        if(!curMROp.mapDone){
1022
            
1023

   
1023
            List<PhysicalOperator> roots = curMROp.mapPlan.getRoots();
1024
            List<PhysicalOperator> roots = curMROp.mapPlan.getRoots();
1024
            if(roots.size() != 1){
1025
            if(roots.size() != 1){
1025
                int errCode = 2171;
1026
                int errCode = 2171;
1026
                String errMsg = "Expected one but found more then one root physical operator in physical plan.";
1027
                String errMsg = "Expected one but found more then one root physical operator in physical plan.";
1027
                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
1028
                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
1028
            }
1029
            }
1029
            
1030

   
1030
            PhysicalOperator phyOp = roots.get(0);
1031
            PhysicalOperator phyOp = roots.get(0);
1031
            if(! (phyOp instanceof POLoad)){
1032
            if(! (phyOp instanceof POLoad)){
1032
                int errCode = 2172;
1033
                int errCode = 2172;
1033
                String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
1034
                String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
1034
                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
1035
                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
1035
            }
1036
            }
1036
            
1037

   
1037
            
1038

   
1038
            LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
1039
            LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
1039
            try {
1040
            try {
1040
                if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
1041
                if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
1041
                    int errCode = 2249;
1042
                    int errCode = 2249;
1042
                    throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", errCode);
1043
                    throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", errCode);
[+20] [20] 16 lines
[+20] public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
1059
                phyToMROpMap.put(op, curMROp);
1060
                phyToMROpMap.put(op, curMROp);
1060
            }catch(Exception e){
1061
            }catch(Exception e){
1061
                int errCode = 2034;
1062
                int errCode = 2034;
1062
                String msg = "Error compiling operator " + op.getClass().getSimpleName();
1063
                String msg = "Error compiling operator " + op.getClass().getSimpleName();
1063
                throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1064
                throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1064
            }    

   
1065
        }
1065
            }

    
   
1066
        }
1066
        else if(!curMROp.reduceDone){
1067
        else if(!curMROp.reduceDone){
1067
        	int errCode=2250;
1068
        	int errCode=2250;
1068
            String msg = "Blocking operators are not allowed before Collected Group. Consider dropping using 'collected'.";
1069
            String msg = "Blocking operators are not allowed before Collected Group. Consider dropping using 'collected'.";
1069
            throw new MRCompilerException(msg, errCode, PigException.BUG);   
1070
            throw new MRCompilerException(msg, errCode, PigException.BUG);
1070
        }
1071
        }
1071
        else{
1072
        else{
1072
            int errCode = 2022;
1073
            int errCode = 2022;
1073
            String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1074
            String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1074
            throw new MRCompilerException(msg, errCode, PigException.BUG);   
1075
            throw new MRCompilerException(msg, errCode, PigException.BUG);
1075
        }
1076
        }
1076
        
1077

   
1077
    }
1078
    }
1078

    
   
1079

   
1079
    @Override
1080
    @Override
1080
    public void visitPOForEach(POForEach op) throws VisitorException{
1081
    public void visitPOForEach(POForEach op) throws VisitorException{
1081
        try{
1082
        try{
[+20] [20] 8 lines
[+20] public void visitPOForEach(POForEach op) throws VisitorException{
1090
            int errCode = 2034;
1091
            int errCode = 2034;
1091
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1092
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1092
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1093
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1093
        }
1094
        }
1094
    }
1095
    }
1095
    
1096

   
1096
    @Override
1097
    @Override
1097
    public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
1098
    public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
1098
        try{
1099
        try{
1099
            blocking(op);
1100
            blocking(op);
1100
            curMROp.customPartitioner = op.getCustomPartitioner();
1101
            curMROp.customPartitioner = op.getCustomPartitioner();
1101
            phyToMROpMap.put(op, curMROp);
1102
            phyToMROpMap.put(op, curMROp);
1102
        }catch(Exception e){
1103
        }catch(Exception e){
1103
            int errCode = 2034;
1104
            int errCode = 2034;
1104
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1105
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1105
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1106
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1106
        }
1107
        }
1107
    }
1108
    }
1108
    
1109

   
1109
    @Override
1110
    @Override
1110
    public void visitPackage(POPackage op) throws VisitorException{
1111
    public void visitPackage(POPackage op) throws VisitorException{
1111
        try{
1112
        try{
1112
            nonBlocking(op);
1113
            nonBlocking(op);
1113
            phyToMROpMap.put(op, curMROp);
1114
            phyToMROpMap.put(op, curMROp);
[+20] [20] 10 lines
[+20] public void visitPackage(POPackage op) throws VisitorException{
1124
            int errCode = 2034;
1125
            int errCode = 2034;
1125
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1126
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1126
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1127
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1127
        }
1128
        }
1128
    }
1129
    }
1129
    
1130

   
1130
    @Override
1131
    @Override
1131
    public void visitUnion(POUnion op) throws VisitorException{
1132
    public void visitUnion(POUnion op) throws VisitorException{
1132
        try{
1133
        try{
1133
            nonBlocking(op);
1134
            nonBlocking(op);
1134
            phyToMROpMap.put(op, curMROp);
1135
            phyToMROpMap.put(op, curMROp);
1135
        }catch(Exception e){
1136
        }catch(Exception e){
1136
            int errCode = 2034;
1137
            int errCode = 2034;
1137
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1138
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1138
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1139
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1139
        }
1140
        }
1140
    }
1141
    }
1141
            
1142

   
1142
    /**
1143
    /**
1143
     * This is an operator which will have multiple inputs(= to number of join inputs)
1144
     * This is an operator which will have multiple inputs(= to number of join inputs)
1144
     * But it prunes off all inputs but the fragment input and creates separate MR jobs
1145
     * But it prunes off all inputs but the fragment input and creates separate MR jobs
1145
     * for each of the replicated inputs and uses these as the replicated files that
1146
     * for each of the replicated inputs and uses these as the replicated files that
1146
     * are configured in the POFRJoin operator. It also sets that this is FRJoin job
1147
     * are configured in the POFRJoin operator. It also sets that this is FRJoin job
[+20] [20] 6 lines
[+20] [+] public void visitFRJoin(POFRJoin op) throws VisitorException {
1153
            for (int i=0; i<replFiles.length; i++) {
1154
            for (int i=0; i<replFiles.length; i++) {
1154
                if(i==op.getFragment()) continue;
1155
                if(i==op.getFragment()) continue;
1155
                replFiles[i] = getTempFileSpec();
1156
                replFiles[i] = getTempFileSpec();
1156
            }
1157
            }
1157
            op.setReplFiles(replFiles);
1158
            op.setReplFiles(replFiles);
1158
            

   
1159

    
   
1159

   

    
   
1160

   
1160
            curMROp = phyToMROpMap.get(op.getInputs().get(op.getFragment()));
1161
            curMROp = phyToMROpMap.get(op.getInputs().get(op.getFragment()));
1161
            for(int i=0;i<compiledInputs.length;i++){
1162
            for(int i=0;i<compiledInputs.length;i++){
1162
                MapReduceOper mro = compiledInputs[i];
1163
                MapReduceOper mro = compiledInputs[i];
1163
                if(curMROp.equals(mro))
1164
                if(curMROp.equals(mro))
1164
                    continue;
1165
                    continue;
1165
                POStore str = getStore();
1166
                POStore str = getStore();
1166
                str.setSFile(replFiles[i]);
1167
                str.setSFile(replFiles[i]);
1167
                
1168

   
1168
                Configuration conf = 
1169
                Configuration conf =
1169
                    ConfigurationUtil.toConfiguration(pigContext.getProperties());
1170
                    ConfigurationUtil.toConfiguration(pigContext.getProperties());
1170
                boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
1171
                boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
1171
                
1172

   
1172
                if (!mro.isMapDone()) {   
1173
                if (!mro.isMapDone()) {
1173
                    if (combinable && hasTooManyInputFiles(mro, conf)) { 
1174
                    if (combinable && hasTooManyInputFiles(mro, conf)) {
1174
                        POStore tmpSto = getStore();
1175
                        POStore tmpSto = getStore();
1175
                        FileSpec fSpec = getTempFileSpec();
1176
                        FileSpec fSpec = getTempFileSpec();
1176
                        tmpSto.setSFile(fSpec);                         
1177
                        tmpSto.setSFile(fSpec);
1177
                        mro.mapPlan.addAsLeaf(tmpSto);
1178
                        mro.mapPlan.addAsLeaf(tmpSto);
1178
                        mro.setMapDoneSingle(true);                    
1179
                        mro.setMapDoneSingle(true);
1179
                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str); 
1180
                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
1180
                        MRPlan.connect(catMROp, curMROp);
1181
                        MRPlan.connect(catMROp, curMROp);
1181
                    } else {
1182
                    } else {
1182
                        mro.mapPlan.addAsLeaf(str);
1183
                        mro.mapPlan.addAsLeaf(str);
1183
                        mro.setMapDoneSingle(true); 
1184
                        mro.setMapDoneSingle(true);
1184
                        MRPlan.connect(mro, curMROp);
1185
                        MRPlan.connect(mro, curMROp);
1185
                    }
1186
                    }
1186
                } else if (mro.isMapDone() && !mro.isReduceDone()) {
1187
                } else if (mro.isMapDone() && !mro.isReduceDone()) {
1187
                    if (combinable && (mro.requestedParallelism >= fileConcatenationThreshold)) {
1188
                    if (combinable && (mro.requestedParallelism >= fileConcatenationThreshold)) {
1188
                        POStore tmpSto = getStore();
1189
                        POStore tmpSto = getStore();
1189
                        FileSpec fSpec = getTempFileSpec();
1190
                        FileSpec fSpec = getTempFileSpec();
1190
                        tmpSto.setSFile(fSpec); 
1191
                        tmpSto.setSFile(fSpec);
1191
                        mro.reducePlan.addAsLeaf(tmpSto);
1192
                        mro.reducePlan.addAsLeaf(tmpSto);
1192
                        mro.setReduceDone(true);
1193
                        mro.setReduceDone(true);
1193
                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str); 
1194
                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
1194
                        MRPlan.connect(catMROp, curMROp);
1195
                        MRPlan.connect(catMROp, curMROp);
1195
                    } else {
1196
                    } else {
1196
                        mro.reducePlan.addAsLeaf(str);
1197
                        mro.reducePlan.addAsLeaf(str);
1197
                        mro.setReduceDone(true);
1198
                        mro.setReduceDone(true);
1198
                        MRPlan.connect(mro, curMROp);
1199
                        MRPlan.connect(mro, curMROp);
1199
                    }
1200
                    }
1200
                } else {
1201
                } else {
1201
                    int errCode = 2022;
1202
                    int errCode = 2022;
1202
                    String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1203
                    String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1203
                    throw new PlanException(msg, errCode, PigException.BUG);
1204
                    throw new PlanException(msg, errCode, PigException.BUG);
1204
                }              

   
1205
            }
1205
                }
1206
            
1206
            }

    
   
1207

   
1207
            if (!curMROp.isMapDone()) {
1208
            if (!curMROp.isMapDone()) {
1208
                curMROp.mapPlan.addAsLeaf(op);
1209
                curMROp.mapPlan.addAsLeaf(op);
1209
            } else if (curMROp.isMapDone() && !curMROp.isReduceDone()) {
1210
            } else if (curMROp.isMapDone() && !curMROp.isReduceDone()) {
1210
                curMROp.reducePlan.addAsLeaf(op);
1211
                curMROp.reducePlan.addAsLeaf(op);
1211
            } else {
1212
            } else {
[+20] [20] 14 lines
[+20] public void visitFRJoin(POFRJoin op) throws VisitorException {
1226
            int errCode = 2034;
1227
            int errCode = 2034;
1227
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1228
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1228
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1229
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1229
        }
1230
        }
1230
    }
1231
    }
1231
  
1232

   
1232
    @SuppressWarnings("unchecked")
1233
    @SuppressWarnings("unchecked")
1233
    private boolean hasTooManyInputFiles(MapReduceOper mro, Configuration conf) {
1234
    private boolean hasTooManyInputFiles(MapReduceOper mro, Configuration conf) {
1234
        if (pigContext == null || pigContext.getExecType() == ExecType.LOCAL) {
1235
        if (pigContext == null || pigContext.getExecType() == ExecType.LOCAL) {
1235
            return false;
1236
            return false;
1236
        }
1237
        }
1237
        
1238

   
1238
        if (mro instanceof NativeMapReduceOper) {
1239
        if (mro instanceof NativeMapReduceOper) {
1239
            return optimisticFileConcatenation ? false : true;
1240
            return optimisticFileConcatenation ? false : true;
1240
        }
1241
        }
1241
               
1242

   
1242
        PhysicalPlan mapPlan = mro.mapPlan;
1243
        PhysicalPlan mapPlan = mro.mapPlan;
1243
        
1244

   
1244
        List<PhysicalOperator> roots = mapPlan.getRoots();
1245
        List<PhysicalOperator> roots = mapPlan.getRoots();
1245
        if (roots == null || roots.size() == 0) return false;
1246
        if (roots == null || roots.size() == 0) return false;
1246
        
1247

   
1247
        int numFiles = 0;
1248
        int numFiles = 0;
1248
        boolean ret = false;
1249
        boolean ret = false;
1249
        try {
1250
        try {
1250
            for (PhysicalOperator root : roots) {
1251
            for (PhysicalOperator root : roots) {
1251
                POLoad ld = (POLoad) root;
1252
                POLoad ld = (POLoad) root;
1252
                String fileName = ld.getLFile().getFileName();
1253
                String fileName = ld.getLFile().getFileName();
1253
                
1254

   
1254
                if(UriUtil.isHDFSFile(fileName)){
1255
                if(UriUtil.isHDFSFile(fileName)){
1255
                    // Only if the input is an hdfs file, this optimization is 
1256
                    // Only if the input is an hdfs file, this optimization is
1256
                    // useful (to reduce load on namenode)
1257
                    // useful (to reduce load on namenode)
1257
                    
1258

   
1258
                    //separate out locations separated by comma
1259
                    //separate out locations separated by comma
1259
                    String [] locations = LoadFunc.getPathStrings(fileName);
1260
                    String [] locations = LoadFunc.getPathStrings(fileName);
1260
                    for(String location : locations){
1261
                    for(String location : locations){
1261
                        if(!UriUtil.isHDFSFile(location))
1262
                        if(!UriUtil.isHDFSFile(location))
1262
                            continue;
1263
                            continue;
[+20] [20] 15 lines
[+20] private boolean hasTooManyInputFiles(MapReduceOper mro, Configuration conf) {
1278
                            numFiles += results.size();
1279
                            numFiles += results.size();
1279
                        } else {
1280
                        } else {
1280
                            List<MapReduceOper> preds = MRPlan.getPredecessors(mro);
1281
                            List<MapReduceOper> preds = MRPlan.getPredecessors(mro);
1281
                            if (preds != null && preds.size() == 1) {
1282
                            if (preds != null && preds.size() == 1) {
1282
                                MapReduceOper pred = preds.get(0);
1283
                                MapReduceOper pred = preds.get(0);
1283
                                if (!pred.reducePlan.isEmpty()) { 
1284
                                if (!pred.reducePlan.isEmpty()) {
1284
                                    numFiles += pred.requestedParallelism;
1285
                                    numFiles += pred.requestedParallelism;
1285
                                } else { // map-only job
1286
                                } else { // map-only job
1286
                                    ret = hasTooManyInputFiles(pred, conf);
1287
                                    ret = hasTooManyInputFiles(pred, conf);
1287
                                    break;
1288
                                    break;
1288
                                }
1289
                                }
1289
                            } else if (!optimisticFileConcatenation) {                    
1290
                            } else if (!optimisticFileConcatenation) {
1290
                                // can't determine the number of input files. 
1291
                                // can't determine the number of input files.
1291
                                // Treat it as having too manyfiles
1292
                                // Treat it as having too manyfiles
1292
                                numFiles = fileConcatenationThreshold;
1293
                                numFiles = fileConcatenationThreshold;
1293
                                break;
1294
                                break;
1294
                            }
1295
                            }
1295
                        }
1296
                        }
1296
                    }
1297
                    }
1297
                }
1298
                }
1298
            }
1299
            }
1299
        } catch (IOException e) {
1300
        } catch (IOException e) {
1300
            LOG.warn("failed to get number of input files", e); 
1301
            LOG.warn("failed to get number of input files", e);
1301
        } catch (InterruptedException e) {
1302
        } catch (InterruptedException e) {
1302
            LOG.warn("failed to get number of input files", e); 
1303
            LOG.warn("failed to get number of input files", e);
1303
        }
1304
        }
1304
                
1305

   
1305
        LOG.info("number of input files: " + numFiles);
1306
        LOG.info("number of input files: " + numFiles);
1306
        return ret ? true : (numFiles >= fileConcatenationThreshold);
1307
        return ret ? true : (numFiles >= fileConcatenationThreshold);
1307
    }
1308
    }
1308
    
1309

   
1309
    /*
1310
    /*
1310
     * Use Mult File Combiner to concatenate small input files
1311
     * Use Mult File Combiner to concatenate small input files
1311
     */
1312
     */
1312
    private MapReduceOper getConcatenateJob(FileSpec fSpec, MapReduceOper old, POStore str)
1313
    private MapReduceOper getConcatenateJob(FileSpec fSpec, MapReduceOper old, POStore str)
1313
            throws PlanException, ExecException {
1314
            throws PlanException, ExecException {
1314
        
1315

   
1315
        MapReduceOper mro = startNew(fSpec, old);
1316
        MapReduceOper mro = startNew(fSpec, old);
1316
        mro.mapPlan.addAsLeaf(str);
1317
        mro.mapPlan.addAsLeaf(str);
1317
        mro.setMapDone(true);
1318
        mro.setMapDone(true);
1318
        
1319

   
1319
        LOG.info("Insert a file-concatenation job");
1320
        LOG.info("Insert a file-concatenation job");
1320
                
1321

   
1321
        return mro;
1322
        return mro;
1322
    }    
1323
    }
1323
    
1324

   
1324
    /** Leftmost relation is referred as base relation (this is the one fed into mappers.) 
1325
    /** Leftmost relation is referred as base relation (this is the one fed into mappers.)
1325
     *  First, close all MROpers except for first one (referred as baseMROPer)
1326
     *  First, close all MROpers except for first one (referred as baseMROPer)
1326
     *  Then, create a MROper which will do indexing job (idxMROper)
1327
     *  Then, create a MROper which will do indexing job (idxMROper)
1327
     *  Connect idxMROper before the mappedMROper in the MRPlan.
1328
     *  Connect idxMROper before the mappedMROper in the MRPlan.
1328
     */
1329
     */
1329

    
   
1330

   
[+20] [20] 4 lines
[+20] [+] public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
1334
            int errCode=2251;
1335
            int errCode=2251;
1335
            String errMsg = "Merge Cogroup work on two or more relations." +
1336
            String errMsg = "Merge Cogroup work on two or more relations." +
1336
            		"To use map-side group-by on single relation, use 'collected' qualifier.";
1337
            		"To use map-side group-by on single relation, use 'collected' qualifier.";
1337
            throw new MRCompilerException(errMsg, errCode);
1338
            throw new MRCompilerException(errMsg, errCode);
1338
        }
1339
        }
1339
            
1340

   
1340
        List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
1341
        List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
1341
        List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
1342
        List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
1342
        List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
1343
        List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
1343
        
1344

   
1344
        try{
1345
        try{
1345
            // Iterate through all the MROpers, disconnect side MROPers from 
1346
            // Iterate through all the MROpers, disconnect side MROPers from
1346
            // MROPerPlan and collect all the information needed in different lists.
1347
            // MROPerPlan and collect all the information needed in different lists.
1347
            
1348

   
1348
            for(int i=0 ; i < compiledInputs.length; i++){
1349
            for(int i=0 ; i < compiledInputs.length; i++){
1349
                
1350

   
1350
                MapReduceOper mrOper = compiledInputs[i];
1351
                MapReduceOper mrOper = compiledInputs[i];
1351
                PhysicalPlan mapPlan = mrOper.mapPlan;
1352
                PhysicalPlan mapPlan = mrOper.mapPlan;
1352
                if(mapPlan.getRoots().size() != 1){
1353
                if(mapPlan.getRoots().size() != 1){
1353
                    int errCode = 2171;
1354
                    int errCode = 2171;
1354
                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
1355
                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
[+20] [20] 4 lines
[+20] public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
1359
                if(! (rootPOOp instanceof POLoad)){
1360
                if(! (rootPOOp instanceof POLoad)){
1360
                    int errCode = 2172;
1361
                    int errCode = 2172;
1361
                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
1362
                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
1362
                    throw new MRCompilerException(errMsg,errCode);
1363
                    throw new MRCompilerException(errMsg,errCode);
1363
                }
1364
                }
1364
                
1365

   
1365
                POLoad sideLoader = (POLoad)rootPOOp;
1366
                POLoad sideLoader = (POLoad)rootPOOp;
1366
                FileSpec loadFileSpec = sideLoader.getLFile();
1367
                FileSpec loadFileSpec = sideLoader.getLFile();
1367
                FuncSpec funcSpec = loadFileSpec.getFuncSpec();
1368
                FuncSpec funcSpec = loadFileSpec.getFuncSpec();
1368
                LoadFunc loadfunc = sideLoader.getLoadFunc();
1369
                LoadFunc loadfunc = sideLoader.getLoadFunc();
1369
                if(i == 0){
1370
                if(i == 0){
1370
                    
1371

   
1371
                    if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
1372
                    if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
1372
                    	int errCode = 2252;
1373
                    	int errCode = 2252;
1373
                        throw new MRCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", errCode);
1374
                        throw new MRCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", errCode);
1374
                    }
1375
                    }
1375
                    
1376

   
1376
                    ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
1377
                    ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
1377
                    continue;
1378
                    continue;
1378
                }
1379
                }
1379
                if(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
1380
                if(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
1380
                    int errCode = 2253;
1381
                    int errCode = 2253;
1381
                    throw new MRCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", errCode);
1382
                    throw new MRCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", errCode);
1382
                }
1383
                }
1383
                
1384

   
1384
                funcSpecs.add(funcSpec);
1385
                funcSpecs.add(funcSpec);
1385
                fileSpecs.add(loadFileSpec.getFileName());
1386
                fileSpecs.add(loadFileSpec.getFileName());
1386
                loaderSigns.add(sideLoader.getSignature());
1387
                loaderSigns.add(sideLoader.getSignature());
1387
                MRPlan.remove(mrOper);
1388
                MRPlan.remove(mrOper);
1388
            }
1389
            }
1389
            
1390

   
1390
            poCoGrp.setSideLoadFuncs(funcSpecs);
1391
            poCoGrp.setSideLoadFuncs(funcSpecs);
1391
            poCoGrp.setSideFileSpecs(fileSpecs);
1392
            poCoGrp.setSideFileSpecs(fileSpecs);
1392
            poCoGrp.setLoaderSignatures(loaderSigns);
1393
            poCoGrp.setLoaderSignatures(loaderSigns);
1393
            
1394

   
1394
            // Use map-reduce operator of base relation for the cogroup operation.
1395
            // Use map-reduce operator of base relation for the cogroup operation.
1395
            MapReduceOper baseMROp = phyToMROpMap.get(poCoGrp.getInputs().get(0));
1396
            MapReduceOper baseMROp = phyToMROpMap.get(poCoGrp.getInputs().get(0));
1396
            if(baseMROp.mapDone || !baseMROp.reducePlan.isEmpty()){
1397
            if(baseMROp.mapDone || !baseMROp.reducePlan.isEmpty()){
1397
                int errCode = 2254;
1398
                int errCode = 2254;
1398
                throw new MRCompilerException("Currently merged cogroup is not supported after blocking operators.", errCode);
1399
                throw new MRCompilerException("Currently merged cogroup is not supported after blocking operators.", errCode);
1399
            }
1400
            }
1400
            
1401

   
1401
            // Create new map-reduce operator for indexing job and then configure it.
1402
            // Create new map-reduce operator for indexing job and then configure it.
1402
            MapReduceOper indexerMROp = getMROp();
1403
            MapReduceOper indexerMROp = getMROp();
1403
            FileSpec idxFileSpec = getIndexingJob(indexerMROp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
1404
            FileSpec idxFileSpec = getIndexingJob(indexerMROp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
1404
            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
1405
            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
1405
            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
1406
            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
1406
            
1407

   
1407
            baseMROp.mapPlan.addAsLeaf(poCoGrp);
1408
            baseMROp.mapPlan.addAsLeaf(poCoGrp);
1408
            for (FuncSpec funcSpec : funcSpecs)
1409
            for (FuncSpec funcSpec : funcSpecs)
1409
                baseMROp.UDFs.add(funcSpec.toString());
1410
                baseMROp.UDFs.add(funcSpec.toString());
1410
            MRPlan.add(indexerMROp);
1411
            MRPlan.add(indexerMROp);
1411
            MRPlan.connect(indexerMROp, baseMROp);
1412
            MRPlan.connect(indexerMROp, baseMROp);
[+20] [20] 21 lines
[+20] public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
1433
            int errCode = 3000;
1434
            int errCode = 3000;
1434
            String errMsg = "IOException caught while compiling POMergeCoGroup";
1435
            String errMsg = "IOException caught while compiling POMergeCoGroup";
1435
            throw new MRCompilerException(errMsg, errCode,e);
1436
            throw new MRCompilerException(errMsg, errCode,e);
1436
        }
1437
        }
1437
    }
1438
    }
1438
    
1439

   
1439
    // Sets up the indexing job for map-side cogroups.
1440
    // Sets up the indexing job for map-side cogroups.
1440
    private FileSpec getIndexingJob(MapReduceOper indexerMROp, 
1441
    private FileSpec getIndexingJob(MapReduceOper indexerMROp,
1441
            final MapReduceOper baseMROp, final List<PhysicalPlan> mapperLRInnerPlans)
1442
            final MapReduceOper baseMROp, final List<PhysicalPlan> mapperLRInnerPlans)
1442
        throws MRCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
1443
        throws MRCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
1443
        
1444

   
1444
        // First replace loader with  MergeJoinIndexer.
1445
        // First replace loader with  MergeJoinIndexer.
1445
        PhysicalPlan baseMapPlan = baseMROp.mapPlan;
1446
        PhysicalPlan baseMapPlan = baseMROp.mapPlan;
1446
        POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);                            
1447
        POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);
1447
        FileSpec origLoaderFileSpec = baseLoader.getLFile();
1448
        FileSpec origLoaderFileSpec = baseLoader.getLFile();
1448
        FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
1449
        FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
1449
        LoadFunc loadFunc = baseLoader.getLoadFunc();
1450
        LoadFunc loadFunc = baseLoader.getLoadFunc();
1450
        
1451

   
1451
        if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
1452
        if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
1452
            int errCode = 1104;
1453
            int errCode = 1104;
1453
            String errMsg = "Base relation of merge-coGroup must implement " +
1454
            String errMsg = "Base relation of merge-coGroup must implement " +
1454
            "OrderedLoadFunc interface. The specified loader " 
1455
            "OrderedLoadFunc interface. The specified loader "
1455
            + funcSpec + " doesn't implement it";
1456
            + funcSpec + " doesn't implement it";
1456
            throw new MRCompilerException(errMsg,errCode);
1457
            throw new MRCompilerException(errMsg,errCode);
1457
        }
1458
        }
1458
        
1459

   
1459
        String[] indexerArgs = new String[6];
1460
        String[] indexerArgs = new String[6];
1460
        indexerArgs[0] = funcSpec.toString();
1461
        indexerArgs[0] = funcSpec.toString();
1461
        indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
1462
        indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
1462
        indexerArgs[3] = baseLoader.getSignature();
1463
        indexerArgs[3] = baseLoader.getSignature();
1463
        indexerArgs[4] = baseLoader.getOperatorKey().scope;
1464
        indexerArgs[4] = baseLoader.getOperatorKey().scope;
1464
        indexerArgs[5] = Boolean.toString(false); // we care for nulls. 
1465
        indexerArgs[5] = Boolean.toString(false); // we care for nulls.
1465
            
1466

   
1466
        PhysicalPlan phyPlan;
1467
        PhysicalPlan phyPlan;
1467
        if (baseMapPlan.getSuccessors(baseLoader) == null 
1468
        if (baseMapPlan.getSuccessors(baseLoader) == null
1468
                || baseMapPlan.getSuccessors(baseLoader).isEmpty()){
1469
                || baseMapPlan.getSuccessors(baseLoader).isEmpty()){
1469
         // Load-Load-Cogroup case.
1470
         // Load-Load-Cogroup case.
1470
            phyPlan = null; 
1471
            phyPlan = null;
1471
        }
1472
        }
1472
            
1473

   
1473
        else{ // We got something. Yank it and set it as inner plan.
1474
        else{ // We got something. Yank it and set it as inner plan.
1474
            phyPlan = baseMapPlan.clone();
1475
            phyPlan = baseMapPlan.clone();
1475
            PhysicalOperator root = phyPlan.getRoots().get(0);
1476
            PhysicalOperator root = phyPlan.getRoots().get(0);
1476
            phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
1477
            phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
1477
            phyPlan.remove(root);
1478
            phyPlan.remove(root);
[+20] [20] 4 lines
[+20] private FileSpec getIndexingJob(MapReduceOper indexerMROp,
1482
        POLoad idxJobLoader = getLoad();
1483
        POLoad idxJobLoader = getLoad();
1483
        idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
1484
        idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
1484
                new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
1485
                new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
1485
        indexerMROp.mapPlan.add(idxJobLoader);
1486
        indexerMROp.mapPlan.add(idxJobLoader);
1486
        indexerMROp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
1487
        indexerMROp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
1487
        
1488

   
1488
        // Loader of mro will return a tuple of form - 
1489
        // Loader of mro will return a tuple of form -
1489
        // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
1490
        // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
1490
        
1491

   
1491
        // After getting an index entry in each mapper, send all of them to one 
1492
        // After getting an index entry in each mapper, send all of them to one
1492
        // reducer where they will be sorted on the way by Hadoop.
1493
        // reducer where they will be sorted on the way by Hadoop.
1493
        MRUtil.simpleConnectMapToReduce(indexerMROp, scope, nig);
1494
        MRUtil.simpleConnectMapToReduce(indexerMROp, scope, nig);
1494
        
1495

   
1495
        indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
1496
        indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
1496
        
1497

   
1497
        // We want to use typed tuple comparator for this job, instead of default 
1498
        // We want to use typed tuple comparator for this job, instead of default
1498
        // raw binary comparator used by Pig, to make sure index entries are 
1499
        // raw binary comparator used by Pig, to make sure index entries are
1499
        // sorted correctly by Hadoop.
1500
        // sorted correctly by Hadoop.
1500
        indexerMROp.useTypedComparator(true); 
1501
        indexerMROp.useTypedComparator(true);
1501

    
   
1502

   
1502
        POStore st = getStore();
1503
        POStore st = getStore();
1503
        FileSpec strFile = getTempFileSpec();
1504
        FileSpec strFile = getTempFileSpec();
1504
        st.setSFile(strFile);
1505
        st.setSFile(strFile);
1505
        indexerMROp.reducePlan.addAsLeaf(st);
1506
        indexerMROp.reducePlan.addAsLeaf(st);
1506
        indexerMROp.setReduceDone(true);
1507
        indexerMROp.setReduceDone(true);
1507

    
   
1508

   
1508
        return strFile;
1509
        return strFile;
1509
    }
1510
    }
1510
    
1511

   
1511
    /** Since merge-join works on two inputs there are exactly two MROper predecessors identified  as left and right.
1512
    /** Since merge-join works on two inputs there are exactly two MROper predecessors identified  as left and right.
1512
     *  Instead of merging two operators, both are used to generate a MR job each. First MR oper is run to generate on-the-fly index on right side.
1513
     *  Instead of merging two operators, both are used to generate a MR job each. First MR oper is run to generate on-the-fly index on right side.
1513
     *  Second is used to actually do the join. First MR oper is identified as rightMROper and second as curMROper.
1514
     *  Second is used to actually do the join. First MR oper is identified as rightMROper and second as curMROper.
1514

    
   
1515

   
1515
     *  1) RightMROper: If it is in map phase. It can be preceded only by POLoad. If there is anything else
1516
     *  1) RightMROper: If it is in map phase. It can be preceded only by POLoad. If there is anything else
[+20] [20] 11 lines
[+20] [+] public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
1527
                int errCode=1101;
1528
                int errCode=1101;
1528
                throw new MRCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
1529
                throw new MRCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
1529
            }
1530
            }
1530

    
   
1531

   
1531
            curMROp = phyToMROpMap.get(joinOp.getInputs().get(0));
1532
            curMROp = phyToMROpMap.get(joinOp.getInputs().get(0));
1532
             
1533

   
1533
            MapReduceOper rightMROpr = null;
1534
            MapReduceOper rightMROpr = null;
1534
            if(curMROp.equals(compiledInputs[0]))
1535
            if(curMROp.equals(compiledInputs[0]))
1535
                rightMROpr = compiledInputs[1];
1536
                rightMROpr = compiledInputs[1];
1536
            else
1537
            else
1537
                rightMROpr = compiledInputs[0];
1538
                rightMROpr = compiledInputs[0];
1538
            
1539

   
1539
            // We will first operate on right side which is indexer job.
1540
            // We will first operate on right side which is indexer job.
1540
            // First yank plan of the compiled right input and set that as an inner plan of right operator.
1541
            // First yank plan of the compiled right input and set that as an inner plan of right operator.
1541
            PhysicalPlan rightPipelinePlan;
1542
            PhysicalPlan rightPipelinePlan;
1542
            if(!rightMROpr.mapDone){
1543
            if(!rightMROpr.mapDone){
1543
                PhysicalPlan rightMapPlan = rightMROpr.mapPlan;
1544
                PhysicalPlan rightMapPlan = rightMROpr.mapPlan;
1544
                if(rightMapPlan.getRoots().size() != 1){
1545
                if(rightMapPlan.getRoots().size() != 1){
1545
                    int errCode = 2171;
1546
                    int errCode = 2171;
1546
                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
1547
                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
1547
                    throw new MRCompilerException(errMsg,errCode,PigException.BUG);
1548
                    throw new MRCompilerException(errMsg,errCode,PigException.BUG);
1548
                }
1549
                }
1549
                
1550

   
1550
                PhysicalOperator rightLoader = rightMapPlan.getRoots().get(0);
1551
                PhysicalOperator rightLoader = rightMapPlan.getRoots().get(0);
1551
                if(! (rightLoader instanceof POLoad)){
1552
                if(! (rightLoader instanceof POLoad)){
1552
                    int errCode = 2172;
1553
                    int errCode = 2172;
1553
                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
1554
                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
1554
                    throw new MRCompilerException(errMsg,errCode);
1555
                    throw new MRCompilerException(errMsg,errCode);
1555
                }
1556
                }
1556
                
1557

   
1557
                if (rightMapPlan.getSuccessors(rightLoader) == null || rightMapPlan.getSuccessors(rightLoader).isEmpty())
1558
                if (rightMapPlan.getSuccessors(rightLoader) == null || rightMapPlan.getSuccessors(rightLoader).isEmpty())
1558
                    // Load - Join case.
1559
                    // Load - Join case.
1559
                    rightPipelinePlan = null; 
1560
                    rightPipelinePlan = null;
1560
                
1561

   
1561
                else{ // We got something on right side. Yank it and set it as inner plan of right input.
1562
                else{ // We got something on right side. Yank it and set it as inner plan of right input.
1562
                    rightPipelinePlan = rightMapPlan.clone();
1563
                    rightPipelinePlan = rightMapPlan.clone();
1563
                    PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
1564
                    PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
1564
                    rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
1565
                    rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
1565
                    rightPipelinePlan.remove(root);
1566
                    rightPipelinePlan.remove(root);
1566
                    rightMapPlan.trimBelow(rightLoader);
1567
                    rightMapPlan.trimBelow(rightLoader);
1567
                }
1568
                }
1568
            }
1569
            }
1569
            
1570

   
1570
            else if(!rightMROpr.reduceDone){ 
1571
            else if(!rightMROpr.reduceDone){
1571
                // Indexer must run in map. If we are in reduce, close it and start new MROper.
1572
                // Indexer must run in map. If we are in reduce, close it and start new MROper.
1572
                // No need of yanking in this case. Since we are starting brand new MR Operator and it will contain nothing.
1573
                // No need of yanking in this case. Since we are starting brand new MR Operator and it will contain nothing.
1573
                POStore rightStore = getStore();
1574
                POStore rightStore = getStore();
1574
                FileSpec rightStrFile = getTempFileSpec();
1575
                FileSpec rightStrFile = getTempFileSpec();
1575
                rightStore.setSFile(rightStrFile);
1576
                rightStore.setSFile(rightStrFile);
1576
                rightMROpr.reducePlan.addAsLeaf(rightStore);
1577
                rightMROpr.reducePlan.addAsLeaf(rightStore);
1577
                rightMROpr.setReduceDone(true);
1578
                rightMROpr.setReduceDone(true);
1578
                rightMROpr = startNew(rightStrFile, rightMROpr);
1579
                rightMROpr = startNew(rightStrFile, rightMROpr);
1579
                rightPipelinePlan = null; 
1580
                rightPipelinePlan = null;
1580
            }
1581
            }
1581
            
1582

   
1582
            else{
1583
            else{
1583
                int errCode = 2022;
1584
                int errCode = 2022;
1584
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1585
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1585
                throw new PlanException(msg, errCode, PigException.BUG);
1586
                throw new PlanException(msg, errCode, PigException.BUG);
1586
            }
1587
            }
1587
            
1588

   
1588
            joinOp.setupRightPipeline(rightPipelinePlan);
1589
            joinOp.setupRightPipeline(rightPipelinePlan);
1589
            rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.        
1590
            rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.
1590
            
1591

   
1591
            // At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
1592
            // At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
1592
            POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
1593
            POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
1593
            joinOp.setSignature(rightLoader.getSignature());
1594
            joinOp.setSignature(rightLoader.getSignature());
1594
            LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
1595
            LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
1595
            List<String> udfs = new ArrayList<String>();
1596
            List<String> udfs = new ArrayList<String>();
1596
            if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
1597
            if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
1597
                joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
1598
                joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
1598
                joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
1599
                joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
1599
                udfs.add(rightLoader.getLFile().getFuncSpec().toString());
1600
                udfs.add(rightLoader.getLFile().getFuncSpec().toString());
1600
                
1601

   
1601
                // we don't need the right MROper since
1602
                // we don't need the right MROper since
1602
                // the right loader is an IndexableLoadFunc which can handle the index
1603
                // the right loader is an IndexableLoadFunc which can handle the index
1603
                // itself
1604
                // itself
1604
                MRPlan.remove(rightMROpr);
1605
                MRPlan.remove(rightMROpr);
1605
                if(rightMROpr == compiledInputs[0]) {
1606
                if(rightMROpr == compiledInputs[0]) {
1606
                    compiledInputs[0] = null;
1607
                    compiledInputs[0] = null;
1607
                } else if(rightMROpr == compiledInputs[1]) {
1608
                } else if(rightMROpr == compiledInputs[1]) {
1608
                    compiledInputs[1] = null;
1609
                    compiledInputs[1] = null;
1609
                } 
1610
                }
1610
                rightMROpr = null;
1611
                rightMROpr = null;
1611
                
1612

   
1612
                // validate that the join keys in merge join are only                                                                                                                                                                              
1613
                // validate that the join keys in merge join are only
1613
                // simple column projections or '*' and not expression - expressions                                                                                                                                                               
1614
                // simple column projections or '*' and not expression - expressions
1614
                // cannot be handled when the index is built by the storage layer on the sorted                                                                                                                                                    
1615
                // cannot be handled when the index is built by the storage layer on the sorted
1615
                // data when the sorted data (and corresponding index) is written.                                                                                                                                                                 
1616
                // data when the sorted data (and corresponding index) is written.
1616
                // So merge join will be restricted not have expressions as                                                                                                                                                                        
1617
                // So merge join will be restricted not have expressions as
1617
                // join keys      
1618
                // join keys
1618
                int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
1619
                int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
1619
                for(int i = 0; i < numInputs; i++) {
1620
                for(int i = 0; i < numInputs; i++) {
1620
                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
1621
                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
1621
                    for (PhysicalPlan keyPlan : keyPlans) {
1622
                    for (PhysicalPlan keyPlan : keyPlans) {
1622
                        for(PhysicalOperator op : keyPlan) {
1623
                        for(PhysicalOperator op : keyPlan) {
[+20] [20] 6 lines
[+20] public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
1629
                        }
1630
                        }
1630
                    }
1631
                    }
1631
                }
1632
                }
1632
            } else {
1633
            } else {
1633
                LoadFunc loadFunc = rightLoader.getLoadFunc();
1634
                LoadFunc loadFunc = rightLoader.getLoadFunc();
1634
                //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While 
1635
                //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While
1635
                //this feature would be useful, the current implementation of DefaultIndexableLoader
1636
                //this feature would be useful, the current implementation of DefaultIndexableLoader
1636
                //is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
1637
                //is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
1637
                //for each call.  Some refactoring of this class is required - and then the check below could be removed.
1638
                //for each call.  Some refactoring of this class is required - and then the check below could be removed.
1638
		if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
1639
		if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
1639
                    int errCode = 1104;
1640
                    int errCode = 1104;
1640
                    String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
1641
                    String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
1641
                    "The specified loader " + loadFunc + " doesn't implement it";
1642
                    "The specified loader " + loadFunc + " doesn't implement it";
1642
                    throw new MRCompilerException(errMsg,errCode);
1643
                    throw new MRCompilerException(errMsg,errCode);
1643
		}
1644
		}
1644
                
1645

   
1645
                // Replace POLoad with  indexer.
1646
                // Replace POLoad with  indexer.
1646

    
   
1647

   
1647
                if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
1648
                if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
1648
                    int errCode = 1104;
1649
                    int errCode = 1104;
1649
                    String errMsg = "Right input of merge-join must implement " +
1650
                    String errMsg = "Right input of merge-join must implement " +
1650
                    "OrderedLoadFunc interface. The specified loader " 
1651
                    "OrderedLoadFunc interface. The specified loader "
1651
                    + loadFunc + " doesn't implement it";
1652
                    + loadFunc + " doesn't implement it";
1652
                    throw new MRCompilerException(errMsg,errCode);
1653
                    throw new MRCompilerException(errMsg,errCode);
1653
                }
1654
                }
1654

    
   
1655

   
1655
                String[] indexerArgs = new String[6];
1656
                String[] indexerArgs = new String[6];
[+20] [20] 4 lines
[+20] public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
1660
                indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
1661
                indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
1661
                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
1662
                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
1662
                indexerArgs[3] = rightLoader.getSignature();
1663
                indexerArgs[3] = rightLoader.getSignature();
1663
                indexerArgs[4] = rightLoader.getOperatorKey().scope;
1664
                indexerArgs[4] = rightLoader.getOperatorKey().scope;
1664
                indexerArgs[5] = Boolean.toString(true);
1665
                indexerArgs[5] = Boolean.toString(true);
1665
                
1666

   
1666
                FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
1667
                FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
1667
                rightLoader.setLFile(lFile);
1668
                rightLoader.setLFile(lFile);
1668
    
1669

   
1669
                // Loader of mro will return a tuple of form - 
1670
                // Loader of mro will return a tuple of form -
1670
                // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
1671
                // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
1671

    
   
1672

   
1672
                MRUtil.simpleConnectMapToReduce(rightMROpr, scope, nig);
1673
                MRUtil.simpleConnectMapToReduce(rightMROpr, scope, nig);
1673
                rightMROpr.useTypedComparator(true);
1674
                rightMROpr.useTypedComparator(true);
1674
                
1675

   
1675
                POStore st = getStore();
1676
                POStore st = getStore();
1676
                FileSpec strFile = getTempFileSpec();
1677
                FileSpec strFile = getTempFileSpec();
1677
                st.setSFile(strFile);
1678
                st.setSFile(strFile);
1678
                rightMROpr.reducePlan.addAsLeaf(st);
1679
                rightMROpr.reducePlan.addAsLeaf(st);
1679
                rightMROpr.setReduceDone(true);
1680
                rightMROpr.setReduceDone(true);
1680
                
1681

   
1681
                // set up the DefaultIndexableLoader for the join operator
1682
                // set up the DefaultIndexableLoader for the join operator
1682
                String[] defaultIndexableLoaderArgs = new String[5];
1683
                String[] defaultIndexableLoaderArgs = new String[5];
1683
                defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
1684
                defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
1684
                defaultIndexableLoaderArgs[1] = strFile.getFileName();
1685
                defaultIndexableLoaderArgs[1] = strFile.getFileName();
1685
                defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString();
1686
                defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString();
1686
                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
1687
                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
1687
                defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
1688
                defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
1688
                joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
1689
                joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
1689
                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());  
1690
                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
1690
                
1691

   
1691
                joinOp.setIndexFile(strFile.getFileName());
1692
                joinOp.setIndexFile(strFile.getFileName());
1692
                udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
1693
                udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
1693
            }
1694
            }
1694
            
1695

   
1695
            // We are done with right side. Lets work on left now.
1696
            // We are done with right side. Lets work on left now.
1696
            // Join will be materialized in leftMROper.
1697
            // Join will be materialized in leftMROper.
1697
            if(!curMROp.mapDone) // Life is easy 
1698
            if(!curMROp.mapDone) // Life is easy
1698
                curMROp.mapPlan.addAsLeaf(joinOp);
1699
                curMROp.mapPlan.addAsLeaf(joinOp);
1699
            
1700

   
1700
            else if(!curMROp.reduceDone){  // This is a map-side join. Close this MROper and start afresh.
1701
            else if(!curMROp.reduceDone){  // This is a map-side join. Close this MROper and start afresh.
1701
                POStore leftStore = getStore();
1702
                POStore leftStore = getStore();
1702
                FileSpec leftStrFile = getTempFileSpec();
1703
                FileSpec leftStrFile = getTempFileSpec();
1703
                leftStore.setSFile(leftStrFile);
1704
                leftStore.setSFile(leftStrFile);
1704
                curMROp.reducePlan.addAsLeaf(leftStore);
1705
                curMROp.reducePlan.addAsLeaf(leftStore);
1705
                curMROp.setReduceDone(true);
1706
                curMROp.setReduceDone(true);
1706
                curMROp = startNew(leftStrFile, curMROp);
1707
                curMROp = startNew(leftStrFile, curMROp);
1707
                curMROp.mapPlan.addAsLeaf(joinOp);
1708
                curMROp.mapPlan.addAsLeaf(joinOp);
1708
            }
1709
            }
1709
            
1710

   
1710
            else{
1711
            else{
1711
                int errCode = 2022;
1712
                int errCode = 2022;
1712
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1713
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1713
                throw new PlanException(msg, errCode, PigException.BUG);
1714
                throw new PlanException(msg, errCode, PigException.BUG);
1714
            }
1715
            }
[+20] [20] 31 lines
[+20] [+] public void visitDistinct(PODistinct op) throws VisitorException {
1746
            PhysicalPlan ep = new PhysicalPlan();
1747
            PhysicalPlan ep = new PhysicalPlan();
1747
            POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
1748
            POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
1748
            prjStar.setResultType(DataType.TUPLE);
1749
            prjStar.setResultType(DataType.TUPLE);
1749
            prjStar.setStar(true);
1750
            prjStar.setStar(true);
1750
            ep.add(prjStar);
1751
            ep.add(prjStar);
1751
            
1752

   
1752
            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
1753
            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
1753
            eps.add(ep);
1754
            eps.add(ep);
1754
            
1755

   
1755
            POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
1756
            POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
1756
            lr.setIndex(0);
1757
            lr.setIndex(0);
1757
            lr.setKeyType(DataType.TUPLE);
1758
            lr.setKeyType(DataType.TUPLE);
1758
            lr.setPlans(eps);
1759
            lr.setPlans(eps);
1759
            lr.setResultType(DataType.TUPLE);
1760
            lr.setResultType(DataType.TUPLE);
1760
            lr.setDistinct(true);
1761
            lr.setDistinct(true);
1761
            
1762

   
1762
            addToMap(lr);
1763
            addToMap(lr);
1763
            
1764

   
1764
            blocking(op);
1765
            blocking(op);
1765
            
1766

   
1766
            POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
1767
            POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
1767
            pkg.setKeyType(DataType.TUPLE);
1768
            pkg.setKeyType(DataType.TUPLE);
1768
            pkg.setDistinct(true);
1769
            pkg.setDistinct(true);
1769
            pkg.setNumInps(1);
1770
            pkg.setNumInps(1);
1770
            boolean[] inner = {false}; 
1771
            boolean[] inner = {false};
1771
            pkg.setInner(inner);
1772
            pkg.setInner(inner);
1772
            curMROp.reducePlan.add(pkg);
1773
            curMROp.reducePlan.add(pkg);
1773
            
1774

   
1774
            List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
1775
            List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
1775
            List<Boolean> flat1 = new ArrayList<Boolean>();
1776
            List<Boolean> flat1 = new ArrayList<Boolean>();
1776
            PhysicalPlan ep1 = new PhysicalPlan();
1777
            PhysicalPlan ep1 = new PhysicalPlan();
1777
            POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
1778
            POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
1778
            prj1.setResultType(DataType.TUPLE);
1779
            prj1.setResultType(DataType.TUPLE);
[+20] [20] 15 lines
[+20] public void visitDistinct(PODistinct op) throws VisitorException {
1794
            int errCode = 2034;
1795
            int errCode = 2034;
1795
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1796
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1796
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1797
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
1797
        }
1798
        }
1798
    }
1799
    }
1799
    
1800

   
1800
    @Override
1801
    @Override
1801
    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
1802
    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
1802
		try {
1803
		try {
1803
			if (compiledInputs.length != 2) {
1804
			if (compiledInputs.length != 2) {
1804
				int errCode = 2255;
1805
				int errCode = 2255;
1805
				throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
1806
				throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
1806
			}
1807
			}
1807
			
1808

   
1808
			//change plan to store the first join input into a temp file
1809
			//change plan to store the first join input into a temp file
1809
			FileSpec fSpec = getTempFileSpec();
1810
			FileSpec fSpec = getTempFileSpec();
1810
			MapReduceOper mro = compiledInputs[0];
1811
			MapReduceOper mro = compiledInputs[0];
1811
			POStore str = getStore();
1812
			POStore str = getStore();
1812
			str.setSFile(fSpec);
1813
			str.setSFile(fSpec);
[+20] [20] 6 lines
[+20] public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
1819
			} else {
1820
			} else {
1820
				int errCode = 2022;
1821
				int errCode = 2022;
1821
				String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1822
				String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
1822
				throw new PlanException(msg, errCode, PigException.BUG);
1823
				throw new PlanException(msg, errCode, PigException.BUG);
1823
			}
1824
			}
1824
			
1825

   
1825
			FileSpec partitionFile = getTempFileSpec();
1826
			FileSpec partitionFile = getTempFileSpec();
1826
			int rp = op.getRequestedParallelism();
1827
			int rp = op.getRequestedParallelism();
1827
			
1828

   
1828
			Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);            
1829
			Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);
1829
			rp = sampleJobPair.second;
1830
			rp = sampleJobPair.second;
1830
			
1831

   
1831
			// set parallelism of SkewedJoin as the value calculated by sampling job
1832
			// set parallelism of SkewedJoin as the value calculated by sampling job
1832
			// if "parallel" is specified in join statement, "rp" is equal to that number
1833
			// if "parallel" is specified in join statement, "rp" is equal to that number
1833
			// if not specified, use the value that sampling process calculated
1834
			// if not specified, use the value that sampling process calculated
1834
			// based on default.
1835
			// based on default.
1835
			op.setRequestedParallelism(rp);
1836
			op.setRequestedParallelism(rp);
1836
						
1837

   
1837
			// load the temp file for first table as input of join            
1838
			// load the temp file for first table as input of join
1838
			MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};            
1839
			MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};
1839
			MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];                       
1840
			MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];
1840
			
1841

   
1841
			compiledInputs = new MapReduceOper[] {joinInputs[0]};
1842
			compiledInputs = new MapReduceOper[] {joinInputs[0]};
1842
			// run POLocalRearrange for first join table
1843
			// run POLocalRearrange for first join table
1843
			POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);            
1844
			POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
1844
			try {
1845
			try {
1845
				lr.setIndex(0);                
1846
				lr.setIndex(0);
1846
			} catch (ExecException e) {
1847
			} catch (ExecException e) {
1847
				int errCode = 2058;
1848
				int errCode = 2058;
1848
				String msg = "Unable to set index on newly created POLocalRearrange.";
1849
				String msg = "Unable to set index on newly created POLocalRearrange.";
1849
				throw new PlanException(msg, errCode, PigException.BUG, e);
1850
				throw new PlanException(msg, errCode, PigException.BUG, e);
1850
			}
1851
			}
1851
			
1852

   
1852
			List<PhysicalOperator> l = plan.getPredecessors(op);
1853
			List<PhysicalOperator> l = plan.getPredecessors(op);
1853
			MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
1854
			MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
1854
			List<PhysicalPlan> groups = joinPlans.get(l.get(0));
1855
			List<PhysicalPlan> groups = joinPlans.get(l.get(0));
1855
			// check the type of group keys, if there are more than one field, the key is TUPLE.
1856
			// check the type of group keys, if there are more than one field, the key is TUPLE.
1856
			byte type = DataType.TUPLE;
1857
			byte type = DataType.TUPLE;
1857
			if (groups.size() == 1) {
1858
			if (groups.size() == 1) {
1858
				type = groups.get(0).getLeaves().get(0).getResultType();                
1859
				type = groups.get(0).getLeaves().get(0).getResultType();
1859
			}               
1860
			}
1860
			
1861

   
1861
			lr.setKeyType(type);            
1862
			lr.setKeyType(type);
1862
			lr.setPlans(groups);
1863
			lr.setPlans(groups);
1863
			lr.setResultType(DataType.TUPLE);
1864
			lr.setResultType(DataType.TUPLE);
1864
			
1865

   
1865
			lr.visit(this);
1866
			lr.visit(this);
1866
			if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
1867
			if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
1867
				curMROp.requestedParallelism = lr.getRequestedParallelism();
1868
				curMROp.requestedParallelism = lr.getRequestedParallelism();
1868
			rearrangeOutputs[0] = curMROp;
1869
			rearrangeOutputs[0] = curMROp;
1869
			
1870

   
1870
			compiledInputs = new MapReduceOper[] {joinInputs[1]};       
1871
			compiledInputs = new MapReduceOper[] {joinInputs[1]};
1871
			// if the map for current input is already closed, then start a new job
1872
			// if the map for current input is already closed, then start a new job
1872
			if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
1873
			if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
1873
				FileSpec f = getTempFileSpec();
1874
				FileSpec f = getTempFileSpec();
1874
				POStore s = getStore();
1875
				POStore s = getStore();
1875
				s.setSFile(f);
1876
				s.setSFile(f);
1876
				compiledInputs[0].reducePlan.addAsLeaf(s);
1877
				compiledInputs[0].reducePlan.addAsLeaf(s);
1877
				compiledInputs[0].setReduceDone(true);
1878
				compiledInputs[0].setReduceDone(true);
1878
				compiledInputs[0] = startNew(f, compiledInputs[0]);
1879
				compiledInputs[0] = startNew(f, compiledInputs[0]);
1879
			}     		      
1880
			}
1880
			
1881

   
1881
			// run POPartitionRearrange for second join table
1882
			// run POPartitionRearrange for second join table
1882
			POPartitionRearrange pr = 
1883
			POPartitionRearrange pr =
1883
			    new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
1884
			    new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
1884
			pr.setPigContext(pigContext);
1885
			pr.setPigContext(pigContext);
1885
			lr = pr;
1886
			lr = pr;
1886
			try {
1887
			try {
1887
				lr.setIndex(1);
1888
				lr.setIndex(1);
1888
			} catch (ExecException e) {
1889
			} catch (ExecException e) {
1889
				int errCode = 2058;
1890
				int errCode = 2058;
1890
				String msg = "Unable to set index on newly created POLocalRearrange.";
1891
				String msg = "Unable to set index on newly created POLocalRearrange.";
1891
				throw new PlanException(msg, errCode, PigException.BUG, e);
1892
				throw new PlanException(msg, errCode, PigException.BUG, e);
1892
			}               
1893
			}
1893
			
1894

   
1894
			groups = joinPlans.get(l.get(1));
1895
			groups = joinPlans.get(l.get(1));
1895
			lr.setPlans(groups);
1896
			lr.setPlans(groups);
1896
			lr.setKeyType(type);            
1897
			lr.setKeyType(type);
1897
			lr.setResultType(DataType.BAG);
1898
			lr.setResultType(DataType.BAG);
1898
			
1899

   
1899
			lr.visit(this);
1900
			lr.visit(this);
1900
			if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
1901
			if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
1901
				curMROp.requestedParallelism = lr.getRequestedParallelism();
1902
				curMROp.requestedParallelism = lr.getRequestedParallelism();
1902
			rearrangeOutputs[1] = curMROp;                     
1903
			rearrangeOutputs[1] = curMROp;
1903
			compiledInputs = rearrangeOutputs;
1904
			compiledInputs = rearrangeOutputs;
1904
					   
1905

   
1905
			
1906

   
1906
			// create POGlobalRearrange
1907
			// create POGlobalRearrange
1907
			POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
1908
			POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
1908
			// Skewed join has its own special partitioner 
1909
			// Skewed join has its own special partitioner
1909
			gr.setResultType(DataType.TUPLE);
1910
			gr.setResultType(DataType.TUPLE);
1910
			gr.visit(this);
1911
			gr.visit(this);
1911
			if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
1912
			if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
1912
				curMROp.requestedParallelism = gr.getRequestedParallelism();
1913
				curMROp.requestedParallelism = gr.getRequestedParallelism();
1913
			compiledInputs = new MapReduceOper[] {curMROp};
1914
			compiledInputs = new MapReduceOper[] {curMROp};
1914
			
1915

   
1915
			// create POPakcage
1916
			// create POPakcage
1916
			POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
1917
			POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
1917
			pkg.setKeyType(type);
1918
			pkg.setKeyType(type);
1918
			pkg.setResultType(DataType.TUPLE);
1919
			pkg.setResultType(DataType.TUPLE);
1919
			pkg.setNumInps(2);
1920
			pkg.setNumInps(2);
1920
			boolean [] inner = op.getInnerFlags();
1921
			boolean [] inner = op.getInnerFlags();
1921
			pkg.setInner(inner);            
1922
			pkg.setInner(inner);
1922
			pkg.visit(this);       
1923
			pkg.visit(this);
1923
			compiledInputs = new MapReduceOper[] {curMROp};
1924
			compiledInputs = new MapReduceOper[] {curMROp};
1924
			
1925

   
1925
			// create POForEach
1926
			// create POForEach
1926
			List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
1927
			List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
1927
			List<Boolean> flat = new ArrayList<Boolean>();
1928
			List<Boolean> flat = new ArrayList<Boolean>();
1928
			
1929

   
1929
			PhysicalPlan ep;
1930
			PhysicalPlan ep;
1930
			// Add corresponding POProjects
1931
			// Add corresponding POProjects
1931
			for (int i=0; i < 2; i++ ) {
1932
			for (int i=0; i < 2; i++ ) {
1932
			    ep = new PhysicalPlan();
1933
			    ep = new PhysicalPlan();
1933
			    POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
1934
			    POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
[+20] [20] 9 lines
[+20] public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
1943
			    flat.add(true);
1944
			    flat.add(true);
1944
			}
1945
			}
1945

    
   
1946

   
1946
			POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
1947
			POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
1947
			fe.setResultType(DataType.TUPLE);
1948
			fe.setResultType(DataType.TUPLE);
1948
			
1949

   
1949
			fe.visit(this);
1950
			fe.visit(this);
1950
			
1951

   
1951
			curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
1952
			curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
1952
			phyToMROpMap.put(op, curMROp);
1953
			phyToMROpMap.put(op, curMROp);
1953
        }catch(PlanException e) {
1954
        }catch(PlanException e) {
1954
            int errCode = 2034;
1955
            int errCode = 2034;
1955
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
1956
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
[+20] [20] 12 lines
[+20] [+] public void visitSort(POSort op) throws VisitorException {
1968
            FileSpec fSpec = getTempFileSpec();
1969
            FileSpec fSpec = getTempFileSpec();
1969
            MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
1970
            MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
1970
            FileSpec quantFile = getTempFileSpec();
1971
            FileSpec quantFile = getTempFileSpec();
1971
            int rp = op.getRequestedParallelism();
1972
            int rp = op.getRequestedParallelism();
1972
            Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
1973
            Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
1973
            Pair<MapReduceOper, Integer> quantJobParallelismPair = 
1974
            Pair<MapReduceOper, Integer> quantJobParallelismPair =
1974
                getQuantileJob(op, mro, fSpec, quantFile, rp);
1975
                getQuantileJob(op, mro, fSpec, quantFile, rp);
1975
            curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile, 
1976
            curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile,
1976
                    quantJobParallelismPair.second, fields);
1977
                    quantJobParallelismPair.second, fields);
1977
            
1978

   
1978
            if(op.isUDFComparatorUsed){
1979
            if(op.isUDFComparatorUsed){
1979
                curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
1980
                curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
1980
                curMROp.isUDFComparatorUsed = true;
1981
                curMROp.isUDFComparatorUsed = true;
1981
            }
1982
            }
1982
            phyToMROpMap.put(op, curMROp);
1983
            phyToMROpMap.put(op, curMROp);
[+20] [20] 91 lines
[+20] [+] public void visitRank(PORank op) throws VisitorException {
2074
        }
2075
        }
2075
        int errCode = 2026;
2076
        int errCode = 2026;
2076
        String msg = "No expression plan found in POSort.";
2077
        String msg = "No expression plan found in POSort.";
2077
        throw new PlanException(msg, errCode, PigException.BUG);
2078
        throw new PlanException(msg, errCode, PigException.BUG);
2078
    }
2079
    }
2079
    
2080

   
2080
    private MapReduceOper getSortJob(
2081
    private MapReduceOper getSortJob(
2081
            POSort sort,
2082
            POSort sort,
2082
            MapReduceOper quantJob,
2083
            MapReduceOper quantJob,
2083
            FileSpec lFile,
2084
            FileSpec lFile,
2084
            FileSpec quantFile,
2085
            FileSpec quantFile,
[+20] [20] 4 lines
[+20] private MapReduceOper getSortJob(
2089
        mro.setGlobalSort(true);
2090
        mro.setGlobalSort(true);
2090
        mro.requestedParallelism = rp;
2091
        mro.requestedParallelism = rp;
2091

    
   
2092

   
2092
        long limit = sort.getLimit();
2093
        long limit = sort.getLimit();
2093
        mro.limit = limit;
2094
        mro.limit = limit;
2094
        
2095

   
2095
        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
2096
        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
2096

    
   
2097

   
2097
        byte keyType = DataType.UNKNOWN;
2098
        byte keyType = DataType.UNKNOWN;
2098
        
2099

   
2099
        boolean[] sortOrder;
2100
        boolean[] sortOrder;
2100

    
   
2101

   
2101
        List<Boolean> sortOrderList = sort.getMAscCols();
2102
        List<Boolean> sortOrderList = sort.getMAscCols();
2102
        if(sortOrderList != null) {
2103
        if(sortOrderList != null) {
2103
            sortOrder = new boolean[sortOrderList.size()];
2104
            sortOrder = new boolean[sortOrderList.size()];
[+20] [20] 41 lines
[+20] private MapReduceOper getSortJob(
2145
                int errCode = 2035;
2146
                int errCode = 2035;
2146
                String msg = "Internal error. Could not compute key type of sort operator.";
2147
                String msg = "Internal error. Could not compute key type of sort operator.";
2147
                throw new PlanException(msg, errCode, PigException.BUG, ve);
2148
                throw new PlanException(msg, errCode, PigException.BUG, ve);
2148
            }
2149
            }
2149
        }
2150
        }
2150
        
2151

   
2151
        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
2152
        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
2152
        try {
2153
        try {
2153
            lr.setIndex(0);
2154
            lr.setIndex(0);
2154
        } catch (ExecException e) {
2155
        } catch (ExecException e) {
2155
        	int errCode = 2058;
2156
        	int errCode = 2058;
[+20] [20] 4 lines
[+20] private MapReduceOper getSortJob(
2160
            keyType);
2161
            keyType);
2161
        lr.setPlans(eps1);
2162
        lr.setPlans(eps1);
2162
        lr.setResultType(DataType.TUPLE);
2163
        lr.setResultType(DataType.TUPLE);
2163
        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
2164
        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
2164
        mro.mapPlan.addAsLeaf(lr);
2165
        mro.mapPlan.addAsLeaf(lr);
2165
        
2166

   
2166
        mro.setMapDone(true);
2167
        mro.setMapDone(true);
2167
        
2168

   
2168
        if (limit!=-1) {
2169
        if (limit!=-1) {
2169
        	POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
2170
        	POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
2170
        	pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
2171
        	pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
2171
            pkg_c.setNumInps(1);
2172
            pkg_c.setNumInps(1);
2172
            //pkg.setResultType(DataType.TUPLE);            
2173
            //pkg.setResultType(DataType.TUPLE);
2173
            mro.combinePlan.add(pkg_c);
2174
            mro.combinePlan.add(pkg_c);
2174
        	
2175

   
2175
            List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
2176
            List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
2176
            List<Boolean> flat_c1 = new ArrayList<Boolean>();
2177
            List<Boolean> flat_c1 = new ArrayList<Boolean>();
2177
            PhysicalPlan ep_c1 = new PhysicalPlan();
2178
            PhysicalPlan ep_c1 = new PhysicalPlan();
2178
            POProject prj_c1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2179
            POProject prj_c1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2179
            prj_c1.setColumn(1);
2180
            prj_c1.setColumn(1);
2180
            prj_c1.setOverloaded(false);
2181
            prj_c1.setOverloaded(false);
2181
            prj_c1.setResultType(DataType.BAG);
2182
            prj_c1.setResultType(DataType.BAG);
2182
            ep_c1.add(prj_c1);
2183
            ep_c1.add(prj_c1);
2183
            eps_c1.add(ep_c1);
2184
            eps_c1.add(ep_c1);
2184
            flat_c1.add(true);
2185
            flat_c1.add(true);
2185
            POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), 
2186
            POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
2186
            		-1, eps_c1, flat_c1);
2187
            		-1, eps_c1, flat_c1);
2187
            fe_c1.setResultType(DataType.TUPLE);
2188
            fe_c1.setResultType(DataType.TUPLE);
2188
            mro.combinePlan.addAsLeaf(fe_c1);
2189
            mro.combinePlan.addAsLeaf(fe_c1);
2189
            
2190

   
2190
            POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
2191
            POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
2191
        	pLimit.setLimit(limit);
2192
        	pLimit.setLimit(limit);
2192
        	mro.combinePlan.addAsLeaf(pLimit);
2193
        	mro.combinePlan.addAsLeaf(pLimit);
2193
            
2194

   
2194
            List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
2195
            List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
2195
            eps_c2.addAll(sort.getSortPlans());
2196
            eps_c2.addAll(sort.getSortPlans());
2196
        
2197

   
2197
	        POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
2198
	        POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
2198
	        try {
2199
	        try {
2199
                lr_c2.setIndex(0);
2200
                lr_c2.setIndex(0);
2200
            } catch (ExecException e) {
2201
            } catch (ExecException e) {
2201
            	int errCode = 2058;
2202
            	int errCode = 2058;
2202
            	String msg = "Unable to set index on newly created POLocalRearrange.";            	
2203
            	String msg = "Unable to set index on newly created POLocalRearrange.";
2203
                throw new PlanException(msg, errCode, PigException.BUG, e);
2204
                throw new PlanException(msg, errCode, PigException.BUG, e);
2204
            }
2205
            }
2205
	        lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
2206
	        lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
2206
	        lr_c2.setPlans(eps_c2);
2207
	        lr_c2.setPlans(eps_c2);
2207
	        lr_c2.setResultType(DataType.TUPLE);
2208
	        lr_c2.setResultType(DataType.TUPLE);
2208
	        mro.combinePlan.addAsLeaf(lr_c2);
2209
	        mro.combinePlan.addAsLeaf(lr_c2);
2209
        }
2210
        }
2210
        
2211

   
2211
        POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
2212
        POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
2212
        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
2213
        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
2213
            keyType);
2214
            keyType);
2214
        pkg.setNumInps(1);       
2215
        pkg.setNumInps(1);
2215
        mro.reducePlan.add(pkg);
2216
        mro.reducePlan.add(pkg);
2216
        
2217

   
2217
        PhysicalPlan ep = new PhysicalPlan();
2218
        PhysicalPlan ep = new PhysicalPlan();
2218
        POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2219
        POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2219
        prj.setColumn(1);
2220
        prj.setColumn(1);
2220
        prj.setOverloaded(false);
2221
        prj.setOverloaded(false);
2221
        prj.setResultType(DataType.BAG);
2222
        prj.setResultType(DataType.BAG);
[+20] [20] 22 lines
[+20] private MapReduceOper getSortJob(
2244
            POSort inpSort,
2245
            POSort inpSort,
2245
            MapReduceOper prevJob,
2246
            MapReduceOper prevJob,
2246
            FileSpec lFile,
2247
            FileSpec lFile,
2247
            FileSpec quantFile,
2248
            FileSpec quantFile,
2248
            int rp) throws PlanException, VisitorException {
2249
            int rp) throws PlanException, VisitorException {
2249
        
2250

   
2250
        POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
2251
        POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
2251
                .getRequestedParallelism(), null, inpSort.getSortPlans(),
2252
                .getRequestedParallelism(), null, inpSort.getSortPlans(),
2252
                inpSort.getMAscCols(), inpSort.getMSortFunc());
2253
                inpSort.getMAscCols(), inpSort.getMSortFunc());
2253
        sort.addOriginalLocation(inpSort.getAlias(), inpSort.getOriginalLocations());
2254
        sort.addOriginalLocation(inpSort.getAlias(), inpSort.getOriginalLocations());
2254
    	
2255

   
2255
    	// Turn the asc/desc array into an array of strings so that we can pass it
2256
    	// Turn the asc/desc array into an array of strings so that we can pass it
2256
        // to the FindQuantiles function.
2257
        // to the FindQuantiles function.
2257
        List<Boolean> ascCols = inpSort.getMAscCols();
2258
        List<Boolean> ascCols = inpSort.getMAscCols();
2258
        String[] ascs = new String[ascCols.size()];
2259
        String[] ascs = new String[ascCols.size()];
2259
        for (int i = 0; i < ascCols.size(); i++) ascs[i] = ascCols.get(i).toString();
2260
        for (int i = 0; i < ascCols.size(); i++) ascs[i] = ascCols.get(i).toString();
[+20] [20] 7 lines
[+20] private MapReduceOper getSortJob(
2267
            ctorArgs[0] = USER_COMPARATOR_MARKER + userComparatorFuncSpec;
2268
            ctorArgs[0] = USER_COMPARATOR_MARKER + userComparatorFuncSpec;
2268
            for(int j = 0; j < ascs.length; j++) {
2269
            for(int j = 0; j < ascs.length; j++) {
2269
                ctorArgs[j+1] = ascs[j];
2270
                ctorArgs[j+1] = ascs[j];
2270
            }
2271
            }
2271
        }
2272
        }
2272
        
2273

   
2273
        return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null, FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
2274
        return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null, FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
2274
    }
2275
    }
2275
    
2276

   
2276
    /**
2277
    /**
2277
     * Create Sampling job for skewed join.
2278
     * Create Sampling job for skewed join.
2278
     */
2279
     */
2279
    private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob, 
2280
    private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob,
2280
    		FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
2281
    		FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
2281
    	    	
2282

   
2282
    	MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
2283
    	MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
2283
    	
2284

   
2284
    	List<PhysicalOperator> l = plan.getPredecessors(op);
2285
    	List<PhysicalOperator> l = plan.getPredecessors(op);
2285
    	List<PhysicalPlan> groups = joinPlans.get(l.get(0));
2286
    	List<PhysicalPlan> groups = joinPlans.get(l.get(0));
2286
    	List<Boolean> ascCol = new ArrayList<Boolean>();
2287
    	List<Boolean> ascCol = new ArrayList<Boolean>();
2287
    	for(int i=0; i<groups.size(); i++) {    		    		
2288
    	for(int i=0; i<groups.size(); i++) {
2288
    		ascCol.add(false);
2289
    		ascCol.add(false);
2289
    	}
2290
    	}
2290
    	
2291

   
2291
    	POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(), null, groups, ascCol, null);
2292
    	POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(), null, groups, ascCol, null);
2292
    	
2293

   
2293
    	// set up transform plan to get keys and memory size of input tuples
2294
    	// set up transform plan to get keys and memory size of input tuples
2294
    	// it first adds all the plans to get key columns,
2295
    	// it first adds all the plans to get key columns,
2295
    	List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>(); 
2296
    	List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
2296
    	transformPlans.addAll(groups);
2297
    	transformPlans.addAll(groups);
2297
        
2298

   
2298
    	// then it adds a column for memory size
2299
    	// then it adds a column for memory size
2299
    	POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2300
    	POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2300
        prjStar.setResultType(DataType.TUPLE);
2301
        prjStar.setResultType(DataType.TUPLE);
2301
        prjStar.setStar(true);            
2302
        prjStar.setStar(true);
2302
        
2303

   
2303
        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
2304
        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
2304
        ufInps.add(prjStar);
2305
        ufInps.add(prjStar);
2305
        
2306

   
2306
    	PhysicalPlan ep = new PhysicalPlan();
2307
    	PhysicalPlan ep = new PhysicalPlan();
2307
    	POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
2308
    	POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
2308
    	            new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
2309
    	            new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
2309
    	uf.setResultType(DataType.TUPLE);
2310
    	uf.setResultType(DataType.TUPLE);
2310
    	ep.add(uf);     
2311
    	ep.add(uf);
2311
    	ep.add(prjStar);
2312
    	ep.add(prjStar);
2312
    	ep.connect(prjStar, uf);
2313
    	ep.connect(prjStar, uf);
2313

    
   
2314

   
2314
        transformPlans.add(ep);      
2315
        transformPlans.add(ep);
2315
        
2316

   
2316
    	try{    		
2317
    	try{
2317
    		// pass configurations to the User Function
2318
    		// pass configurations to the User Function
2318
    		String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", 
2319
    		String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
2319
                                   String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
2320
                                   String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
2320
    		String mc = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
2321
    		String mc = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
2321
    		String inputFile = lFile.getFileName();
2322
    		String inputFile = lFile.getFileName();
2322

    
   
2323

   
2323
    		return getSamplingJob(sort, prevJob, transformPlans, lFile, sampleFile, rp, null, 
2324
    		return getSamplingJob(sort, prevJob, transformPlans, lFile, sampleFile, rp, null,
2324
    							PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, PoissonSampleLoader.class.getName());
2325
    							PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, PoissonSampleLoader.class.getName());
2325
    	}catch(Exception e) {
2326
    	}catch(Exception e) {
2326
    		throw new PlanException(e);
2327
    		throw new PlanException(e);
2327
    	}
2328
    	}
2328
    }    	 
2329
    }
2329
  
2330

   
2330
  	
2331

   
2331
    /**
2332
    /**
2332
     * Create a sampling job to collect statistics by sampling an input file. The sequence of operations is as
2333
     * Create a sampling job to collect statistics by sampling an input file. The sequence of operations is as
2333
     * following:
2334
     * following:
2334
     * <li>Transform input sample tuples into another tuple.</li>
2335
     * <li>Transform input sample tuples into another tuple.</li>
2335
     * <li>Add an extra field &quot;all&quot; into the tuple </li>
2336
     * <li>Add an extra field &quot;all&quot; into the tuple </li>
2336
     * <li>Package all tuples into one bag </li>
2337
     * <li>Package all tuples into one bag </li>
2337
     * <li>Add constant field for number of reducers. </li>
2338
     * <li>Add constant field for number of reducers. </li>
2338
     * <li>Sorting the bag </li>
2339
     * <li>Sorting the bag </li>
2339
     * <li>Invoke UDF with the number of reducers and the sorted bag.</li>
2340
     * <li>Invoke UDF with the number of reducers and the sorted bag.</li>
2340
     * <li>Data generated by UDF is stored into a file.</li>
2341
     * <li>Data generated by UDF is stored into a file.</li>
2341
     * 
2342
     *
2342
     * @param sort  the POSort operator used to sort the bag
2343
     * @param sort  the POSort operator used to sort the bag
2343
     * @param prevJob  previous job of current sampling job
2344
     * @param prevJob  previous job of current sampling job
2344
     * @param transformPlans  PhysicalPlans to transform input samples
2345
     * @param transformPlans  PhysicalPlans to transform input samples
2345
     * @param lFile  path of input file
2346
     * @param lFile  path of input file
2346
     * @param sampleFile  path of output file
2347
     * @param sampleFile  path of output file
[+20] [20] 6 lines
[+20] private MapReduceOper getSortJob(
2353
     * @throws PlanException
2354
     * @throws PlanException
2354
     * @throws VisitorException
2355
     * @throws VisitorException
2355
     */
2356
     */
2356
  	@SuppressWarnings("deprecation")
2357
  	@SuppressWarnings("deprecation")
2357
    private Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
2358
    private Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
2358
  			FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans, 
2359
  			FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
2359
  			String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
2360
  			String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
2360
  		
2361

   
2361
  		String[] rslargs = new String[2];
2362
  		String[] rslargs = new String[2];
2362
        // SampleLoader expects string version of FuncSpec 
2363
        // SampleLoader expects string version of FuncSpec
2363
        // as its first constructor argument.
2364
        // as its first constructor argument.
2364
        
2365

   
2365
        rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
2366
        rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
2366
        
2367

   
2367
        rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
2368
        rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
2368
        FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
2369
        FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
2369
        		new FuncSpec(sampleLdrClassName, rslargs));
2370
        		new FuncSpec(sampleLdrClassName, rslargs));
2370
        
2371

   
2371
        MapReduceOper mro = startNew(quantLdFilName, prevJob);
2372
        MapReduceOper mro = startNew(quantLdFilName, prevJob);
2372
       
2373

   
2373
        if(sort.isUDFComparatorUsed) {
2374
        if(sort.isUDFComparatorUsed) {
2374
            mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString());
2375
            mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString());
2375
            curMROp.isUDFComparatorUsed = true;
2376
            curMROp.isUDFComparatorUsed = true;
2376
        }        
2377
        }
2377
    
2378

   
2378
        List<Boolean> flat1 = new ArrayList<Boolean>();         
2379
        List<Boolean> flat1 = new ArrayList<Boolean>();
2379
        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
2380
        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
2380
        
2381

   
2381
        // if transform plans are not specified, project the columns of sorting keys
2382
        // if transform plans are not specified, project the columns of sorting keys
2382
        if (transformPlans == null) {        	
2383
        if (transformPlans == null) {
2383
            Pair<POProject, Byte>[] sortProjs = null;
2384
            Pair<POProject, Byte>[] sortProjs = null;
2384
            try{
2385
            try{
2385
            	sortProjs = getSortCols(sort.getSortPlans());
2386
            	sortProjs = getSortCols(sort.getSortPlans());
2386
            }catch(Exception e) {
2387
            }catch(Exception e) {
2387
            	throw new RuntimeException(e);
2388
            	throw new RuntimeException(e);
2388
            }
2389
            }
2389
            // Set up the projections of the key columns 
2390
            // Set up the projections of the key columns
2390
            if (sortProjs == null) {
2391
            if (sortProjs == null) {
2391
                PhysicalPlan ep = new PhysicalPlan();
2392
                PhysicalPlan ep = new PhysicalPlan();
2392
                POProject prj = new POProject(new OperatorKey(scope,
2393
                POProject prj = new POProject(new OperatorKey(scope,
2393
                    nig.getNextNodeId(scope)));
2394
                    nig.getNextNodeId(scope)));
2394
                prj.setStar(true);
2395
                prj.setStar(true);
2395
                prj.setOverloaded(false);
2396
                prj.setOverloaded(false);
2396
                prj.setResultType(DataType.TUPLE);
2397
                prj.setResultType(DataType.TUPLE);
2397
                ep.add(prj);
2398
                ep.add(prj);
2398
                eps1.add(ep);
2399
                eps1.add(ep);
2399
                flat1.add(true);
2400
                flat1.add(true);
2400
            } else {
2401
            } else {
2401
                for (Pair<POProject, Byte> sortProj : sortProjs) {
2402
                for (Pair<POProject, Byte> sortProj : sortProjs) {
2402
                    // Check for proj being null, null is used by getSortCols for a non POProject
2403
                    // Check for proj being null, null is used by getSortCols for a non POProject
2403
                    // operator. Since Order by does not allow expression operators, 
2404
                    // operator. Since Order by does not allow expression operators,
2404
                    //it should never be set to null
2405
                    //it should never be set to null
2405
                    if(sortProj == null){
2406
                    if(sortProj == null){
2406
                        int errCode = 2174;
2407
                        int errCode = 2174;
2407
                        String msg = "Internal exception. Could not create a sampler job";
2408
                        String msg = "Internal exception. Could not create a sampler job";
2408
                        throw new MRCompilerException(msg, errCode, PigException.BUG);
2409
                        throw new MRCompilerException(msg, errCode, PigException.BUG);
[+20] [20] 18 lines
[+20] private MapReduceOper getSortJob(
2427
                eps1.add(transformPlans.get(i));
2428
                eps1.add(transformPlans.get(i));
2428
                flat1.add(true);
2429
                flat1.add(true);
2429
            }
2430
            }
2430
        }
2431
        }
2431

    
   
2432

   
2432
        // This foreach will pick the sort key columns from the RandomSampleLoader output 
2433
        // This foreach will pick the sort key columns from the RandomSampleLoader output
2433
        POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
2434
        POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
2434
        mro.mapPlan.addAsLeaf(nfe1);
2435
        mro.mapPlan.addAsLeaf(nfe1);
2435
        
2436

   
2436
        // Now set up a POLocalRearrange which has "all" as the key and the output of the
2437
        // Now set up a POLocalRearrange which has "all" as the key and the output of the
2437
        // foreach will be the "value" out of POLocalRearrange
2438
        // foreach will be the "value" out of POLocalRearrange
2438
        PhysicalPlan ep1 = new PhysicalPlan();
2439
        PhysicalPlan ep1 = new PhysicalPlan();
2439
        ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
2440
        ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
2440
        ce.setValue("all");
2441
        ce.setValue("all");
2441
        ce.setResultType(DataType.CHARARRAY);
2442
        ce.setResultType(DataType.CHARARRAY);
2442
        ep1.add(ce);
2443
        ep1.add(ce);
2443
        
2444

   
2444
        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
2445
        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
2445
        eps.add(ep1);
2446
        eps.add(ep1);
2446
        
2447

   
2447
        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
2448
        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
2448
        try {
2449
        try {
2449
            lr.setIndex(0);
2450
            lr.setIndex(0);
2450
        } catch (ExecException e) {
2451
        } catch (ExecException e) {
2451
        	int errCode = 2058;
2452
        	int errCode = 2058;
[+20] [20] 4 lines
[+20] private MapReduceOper getSortJob(
2456
        lr.setPlans(eps);
2457
        lr.setPlans(eps);
2457
        lr.setResultType(DataType.TUPLE);
2458
        lr.setResultType(DataType.TUPLE);
2458
        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
2459
        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
2459
        mro.mapPlan.add(lr);
2460
        mro.mapPlan.add(lr);
2460
        mro.mapPlan.connect(nfe1, lr);
2461
        mro.mapPlan.connect(nfe1, lr);
2461
        
2462

   
2462
        mro.setMapDone(true);
2463
        mro.setMapDone(true);
2463
        
2464

   
2464
        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
2465
        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
2465
        pkg.setKeyType(DataType.CHARARRAY);
2466
        pkg.setKeyType(DataType.CHARARRAY);
2466
        pkg.setNumInps(1);
2467
        pkg.setNumInps(1);
2467
        boolean[] inner = {false}; 
2468
        boolean[] inner = {false};
2468
        pkg.setInner(inner);
2469
        pkg.setInner(inner);
2469
        mro.reducePlan.add(pkg);
2470
        mro.reducePlan.add(pkg);
2470
        
2471

   
2471
        // Lets start building the plan which will have the sort
2472
        // Lets start building the plan which will have the sort
2472
        // for the foreach
2473
        // for the foreach
2473
        PhysicalPlan fe2Plan = new PhysicalPlan();
2474
        PhysicalPlan fe2Plan = new PhysicalPlan();
2474
        // Top level project which just projects the tuple which is coming 
2475
        // Top level project which just projects the tuple which is coming
2475
        // from the foreach after the package
2476
        // from the foreach after the package
2476
        POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2477
        POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2477
        topPrj.setColumn(1);
2478
        topPrj.setColumn(1);
2478
        topPrj.setResultType(DataType.BAG);
2479
        topPrj.setResultType(DataType.BAG);
2479
        topPrj.setOverloaded(true);
2480
        topPrj.setOverloaded(true);
2480
        fe2Plan.add(topPrj);
2481
        fe2Plan.add(topPrj);
2481
        
2482

   
2482
        // the projections which will form sort plans
2483
        // the projections which will form sort plans
2483
        List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();             
2484
        List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
2484
        if (sortKeyPlans != null) {
2485
        if (sortKeyPlans != null) {
2485
        	for(int i=0; i<sortKeyPlans.size(); i++) {        	
2486
        	for(int i=0; i<sortKeyPlans.size(); i++) {
2486
        		nesSortPlanLst.add(sortKeyPlans.get(i));        	
2487
        		nesSortPlanLst.add(sortKeyPlans.get(i));
2487
        	}
2488
        	}
2488
        }else{   
2489
        }else{
2489
            Pair<POProject, Byte>[] sortProjs = null;
2490
            Pair<POProject, Byte>[] sortProjs = null;
2490
            try{
2491
            try{
2491
            	sortProjs = getSortCols(sort.getSortPlans());
2492
            	sortProjs = getSortCols(sort.getSortPlans());
2492
            }catch(Exception e) {
2493
            }catch(Exception e) {
2493
            	throw new RuntimeException(e);
2494
            	throw new RuntimeException(e);
2494
            }
2495
            }
2495
            // Set up the projections of the key columns 
2496
            // Set up the projections of the key columns
2496
            if (sortProjs == null) {
2497
            if (sortProjs == null) {
2497
                PhysicalPlan ep = new PhysicalPlan();
2498
                PhysicalPlan ep = new PhysicalPlan();
2498
                POProject prj = new POProject(new OperatorKey(scope,
2499
                POProject prj = new POProject(new OperatorKey(scope,
2499
                    nig.getNextNodeId(scope)));
2500
                    nig.getNextNodeId(scope)));
2500
                prj.setStar(true);
2501
                prj.setStar(true);
2501
                prj.setOverloaded(false);
2502
                prj.setOverloaded(false);
2502
                prj.setResultType(DataType.TUPLE);
2503
                prj.setResultType(DataType.TUPLE);
2503
                ep.add(prj);
2504
                ep.add(prj);
2504
                nesSortPlanLst.add(ep);
2505
                nesSortPlanLst.add(ep);
2505
            } else {
2506
            } else {
2506
                for (int i=0; i<sortProjs.length; i++) {
2507
                for (int i=0; i<sortProjs.length; i++) {
2507
                    POProject prj =
2508
                    POProject prj =
2508
                        new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2509
                        new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2509
                    
2510

   
2510
                    prj.setResultType(sortProjs[i].second);
2511
                    prj.setResultType(sortProjs[i].second);
2511
                    if(sortProjs[i].first != null && sortProjs[i].first.isProjectToEnd()){
2512
                    if(sortProjs[i].first != null && sortProjs[i].first.isProjectToEnd()){
2512
                        if(i != sortProjs.length -1){
2513
                        if(i != sortProjs.length -1){
2513
                            //project to end has to be the last sort column
2514
                            //project to end has to be the last sort column
2514
                            throw new AssertionError("Project-range to end (x..)" +
2515
                            throw new AssertionError("Project-range to end (x..)" +
[+20] [20] 9 lines
[+20] private MapReduceOper getSortJob(
2524

    
   
2525

   
2525
                    PhysicalPlan ep = new PhysicalPlan();
2526
                    PhysicalPlan ep = new PhysicalPlan();
2526
                    ep.add(prj);
2527
                    ep.add(prj);
2527
                    nesSortPlanLst.add(ep);
2528
                    nesSortPlanLst.add(ep);
2528
                }
2529
                }
2529
            }                       

   
2530
        }
2530
            }
2531
        
2531
        }

    
   
2532

   
2532
        sort.setSortPlans(nesSortPlanLst);
2533
        sort.setSortPlans(nesSortPlanLst);
2533
        sort.setResultType(DataType.BAG);
2534
        sort.setResultType(DataType.BAG);
2534
        fe2Plan.add(sort);
2535
        fe2Plan.add(sort);
2535
        fe2Plan.connect(topPrj, sort);
2536
        fe2Plan.connect(topPrj, sort);
2536
        
2537

   
2537
        // The plan which will have a constant representing the
2538
        // The plan which will have a constant representing the
2538
        // degree of parallelism for the final order by map-reduce job
2539
        // degree of parallelism for the final order by map-reduce job
2539
        // this will either come from a "order by parallel x" in the script
2540
        // this will either come from a "order by parallel x" in the script
2540
        // or will be the default number of reducers for the cluster if
2541
        // or will be the default number of reducers for the cluster if
2541
        // "parallel x" is not used in the script
2542
        // "parallel x" is not used in the script
2542
        PhysicalPlan rpep = new PhysicalPlan();
2543
        PhysicalPlan rpep = new PhysicalPlan();
2543
        ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
2544
        ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
2544
        rpce.setRequestedParallelism(rp);
2545
        rpce.setRequestedParallelism(rp);
2545
        
2546

   
2546
        // We temporarily set it to rp and will adjust it at runtime, because the final degree of parallelism
2547
        // We temporarily set it to rp and will adjust it at runtime, because the final degree of parallelism
2547
        // is unknown until we are ready to submit it. See PIG-2779.
2548
        // is unknown until we are ready to submit it. See PIG-2779.
2548
        rpce.setValue(rp);
2549
        rpce.setValue(rp);
2549
        
2550

   
2550
        rpce.setResultType(DataType.INTEGER);
2551
        rpce.setResultType(DataType.INTEGER);
2551
        rpep.add(rpce);
2552
        rpep.add(rpce);
2552
        
2553

   
2553
        List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
2554
        List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
2554
        genEps.add(rpep);
2555
        genEps.add(rpep);
2555
        genEps.add(fe2Plan);
2556
        genEps.add(fe2Plan);
2556
        
2557

   
2557
        List<Boolean> flattened2 = new ArrayList<Boolean>();
2558
        List<Boolean> flattened2 = new ArrayList<Boolean>();
2558
        flattened2.add(false);
2559
        flattened2.add(false);
2559
        flattened2.add(false);
2560
        flattened2.add(false);
2560
        
2561

   
2561
        POForEach nfe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1, genEps, flattened2);
2562
        POForEach nfe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1, genEps, flattened2);
2562
        mro.reducePlan.add(nfe2);
2563
        mro.reducePlan.add(nfe2);
2563
        mro.reducePlan.connect(pkg, nfe2);
2564
        mro.reducePlan.connect(pkg, nfe2);
2564
        
2565

   
2565
        // Let's connect the output from the foreach containing
2566
        // Let's connect the output from the foreach containing
2566
        // number of quantiles and the sorted bag of samples to
2567
        // number of quantiles and the sorted bag of samples to
2567
        // another foreach with the FindQuantiles udf. The input
2568
        // another foreach with the FindQuantiles udf. The input
2568
        // to the FindQuantiles udf is a project(*) which takes the 
2569
        // to the FindQuantiles udf is a project(*) which takes the
2569
        // foreach input and gives it to the udf
2570
        // foreach input and gives it to the udf
2570
        PhysicalPlan ep4 = new PhysicalPlan();
2571
        PhysicalPlan ep4 = new PhysicalPlan();
2571
        POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2572
        POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
2572
        prjStar4.setResultType(DataType.TUPLE);
2573
        prjStar4.setResultType(DataType.TUPLE);
2573
        prjStar4.setStar(true);
2574
        prjStar4.setStar(true);
2574
        ep4.add(prjStar4);
2575
        ep4.add(prjStar4);
2575
        
2576

   
2576
        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
2577
        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
2577
        ufInps.add(prjStar4);
2578
        ufInps.add(prjStar4);
2578
      
2579

   
2579
        POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, 
2580
        POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
2580
            new FuncSpec(udfClassName, udfArgs));
2581
            new FuncSpec(udfClassName, udfArgs));
2581
        ep4.add(uf);
2582
        ep4.add(uf);
2582
        ep4.connect(prjStar4, uf);
2583
        ep4.connect(prjStar4, uf);
2583
        
2584

   
2584
        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
2585
        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
2585
        ep4s.add(ep4);
2586
        ep4s.add(ep4);
2586
        List<Boolean> flattened3 = new ArrayList<Boolean>();
2587
        List<Boolean> flattened3 = new ArrayList<Boolean>();
2587
        flattened3.add(false);
2588
        flattened3.add(false);
2588
        POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
2589
        POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
2589
        
2590

   
2590
        mro.reducePlan.add(nfe3);
2591
        mro.reducePlan.add(nfe3);
2591
        mro.reducePlan.connect(nfe2, nfe3);
2592
        mro.reducePlan.connect(nfe2, nfe3);
2592
        
2593

   
2593
        POStore str = getStore();
2594
        POStore str = getStore();
2594
        str.setSFile(sampleFile);
2595
        str.setSFile(sampleFile);
2595
        
2596

   
2596
        mro.reducePlan.add(str);
2597
        mro.reducePlan.add(str);
2597
        mro.reducePlan.connect(nfe3, str);
2598
        mro.reducePlan.connect(nfe3, str);
2598
        
2599

   
2599
        mro.setReduceDone(true);
2600
        mro.setReduceDone(true);
2600
        mro.requestedParallelism = 1;
2601
        mro.requestedParallelism = 1;
2601
        mro.markSampler();
2602
        mro.markSampler();
2602
        return new Pair<MapReduceOper, Integer>(mro, rp);
2603
        return new Pair<MapReduceOper, Integer>(mro, rp);
2603
    }
2604
    }
2604

    
   
2605

   
2605
    static class LastInputStreamingOptimizer extends MROpPlanVisitor {
2606
    static class LastInputStreamingOptimizer extends MROpPlanVisitor {
2606
        String chunkSize;
2607
        String chunkSize;
2607
        LastInputStreamingOptimizer(MROperPlan plan, String chunkSize) {
2608
        LastInputStreamingOptimizer(MROperPlan plan, String chunkSize) {
2608
            super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
2609
            super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
2609
            this.chunkSize = chunkSize;
2610
            this.chunkSize = chunkSize;
2610
        }
2611
        }
2611
        
2612

   
2612
        /**indTupIter
2613
        /**indTupIter
2613
         * Look for pattern POPackage->POForEach(if both are flatten), change it to POJoinPackage
2614
         * Look for pattern POPackage->POForEach(if both are flatten), change it to POJoinPackage
2614
         * We can avoid materialize the input and construct the result of join on the fly
2615
         * We can avoid materialize the input and construct the result of join on the fly
2615
         * 
2616
         *
2616
         * @param mr - map-reduce plan to optimize
2617
         * @param mr - map-reduce plan to optimize
2617
         */ 
2618
         */
2618
        @Override
2619
        @Override
2619
        public void visitMROp(MapReduceOper mr) throws VisitorException {
2620
        public void visitMROp(MapReduceOper mr) throws VisitorException {
2620
            // Only optimize:
2621
            // Only optimize:
2621
            // 1. POPackage->POForEach is the root of reduce plan
2622
            // 1. POPackage->POForEach is the root of reduce plan
2622
            // 2. POUnion is the leaf of map plan (so that we exclude distinct, sort...)
2623
            // 2. POUnion is the leaf of map plan (so that we exclude distinct, sort...)
2623
            // 3. No combiner plan
2624
            // 3. No combiner plan
2624
            // 4. POForEach nested plan only contains POProject in any depth
2625
            // 4. POForEach nested plan only contains POProject in any depth
2625
            // 5. Inside POForEach, all occurrences of the last input are flattened
2626
            // 5. Inside POForEach, all occurrences of the last input are flattened
2626
            
2627

   
2627
            if (mr.mapPlan.isEmpty()) return;
2628
            if (mr.mapPlan.isEmpty()) return;
2628
            if (mr.reducePlan.isEmpty()) return;
2629
            if (mr.reducePlan.isEmpty()) return;
2629

    
   
2630

   
2630
            // Check combiner plan
2631
            // Check combiner plan
2631
            if (!mr.combinePlan.isEmpty()) {
2632
            if (!mr.combinePlan.isEmpty()) {
2632
                return;
2633
                return;
2633
            }
2634
            }
2634
            
2635

   
2635
            // Check map plan
2636
            // Check map plan
2636
            List<PhysicalOperator> mpLeaves = mr.mapPlan.getLeaves();
2637
            List<PhysicalOperator> mpLeaves = mr.mapPlan.getLeaves();
2637
            if (mpLeaves.size()!=1) {
2638
            if (mpLeaves.size()!=1) {
2638
                return;
2639
                return;
2639
            }
2640
            }
2640
            PhysicalOperator op = mpLeaves.get(0);
2641
            PhysicalOperator op = mpLeaves.get(0);
2641
            
2642

   
2642
            if (!(op instanceof POUnion)) {
2643
            if (!(op instanceof POUnion)) {
2643
                return;
2644
                return;
2644
            }
2645
            }
2645
            
2646

   
2646
            // Check reduce plan
2647
            // Check reduce plan
2647
            List<PhysicalOperator> mrRoots = mr.reducePlan.getRoots();
2648
            List<PhysicalOperator> mrRoots = mr.reducePlan.getRoots();
2648
            if (mrRoots.size()!=1) {
2649
            if (mrRoots.size()!=1) {
2649
                return;