Review Board 1.7.22


PIG-3501 Initial implementation of TezJobControlCompiler

Review Request #14505 - Created Oct. 5, 2013 and submitted

Cheolsoo Park
tez-branch
PIG-3501
Reviewers
pig
daijy, mwagner, rohini
pig-git
Initial implementation of TezJonControlCompiler that converts tez plan into tez dag. Tez dag is built by TezDagBuilder that is a dependency walker of TezOpPlan. At each node, it creates a tez vertex and connect it to dependent vertices with a tez edge.
Added unit test cases to TestTezJobControlCompiler.
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Revision 1d5bf73 New Change
[20] 502 lines
[+20] [+] private Job getJob(MROperPlan plan, MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException{
503
            {
503
            {
504

    
   
504

   
505
                // Setup the DistributedCache for this job
505
                // Setup the DistributedCache for this job
506
                for (URL extraJar : pigContext.extraJars) {
506
                for (URL extraJar : pigContext.extraJars) {
507
                    log.debug("Adding jar to DistributedCache: " + extraJar.toString());
507
                    log.debug("Adding jar to DistributedCache: " + extraJar.toString());
508
                    putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
508
                    Utils.putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
509
                }
509
                }
510

    
   
510

   
511
                //Create the jar of all functions and classes required
511
                //Create the jar of all functions and classes required
512
                File submitJarFile = File.createTempFile("Job", ".jar");
512
                File submitJarFile = File.createTempFile("Job", ".jar");
513
                log.info("creating jar file "+submitJarFile.getName());
513
                log.info("creating jar file "+submitJarFile.getName());
[+20] [20] 441 lines
[+20] [+] public static int estimateNumberOfReducers(org.apache.hadoop.mapreduce.Job job,
955
        return numberOfReducers;
955
        return numberOfReducers;
956
    }
956
    }
957

    
   
957

   
958
    public static class PigSecondaryKeyGroupComparator extends WritableComparator {
958
    public static class PigSecondaryKeyGroupComparator extends WritableComparator {
959
        public PigSecondaryKeyGroupComparator() {
959
        public PigSecondaryKeyGroupComparator() {
960
//            super(TupleFactory.getInstance().tupleClass(), true);

   
961
            super(NullableTuple.class, true);
960
            super(NullableTuple.class, true);
962
        }
961
        }
963

    
   
962

   
964
        @SuppressWarnings("unchecked")
963
        @SuppressWarnings("unchecked")
965
		@Override
964
		@Override
[+20] [20] 505 lines
[+20] [+] private static URI toURI(Path src) throws ExecException {
1471
                    "File doesn't exist: " + src;
1470
                    "File doesn't exist: " + src;
1472
            throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
1471
            throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
1473
        }
1472
        }
1474
    }
1473
    }
1475

    
   
1474

   
1476
    /**

   
1477
     * if url is not in HDFS will copy the path to HDFS from local before adding to distributed cache

   
1478
     * @param pigContext the pigContext

   
1479
     * @param conf the job conf

   
1480
     * @param url the url to be added to distributed cache

   
1481
     * @return the path as seen on distributed cache

   
1482
     * @throws IOException

   
1483
     */

   
1484
    @SuppressWarnings("deprecation")

   
1485
    private static void putJarOnClassPathThroughDistributedCache(

   
1486
            PigContext pigContext,

   
1487
            Configuration conf,

   
1488
            URL url) throws IOException {

   
1489

    
   

   
1490
        // Turn on the symlink feature

   
1491
        DistributedCache.createSymlink(conf);

   
1492

    
   

   
1493
        // REGISTER always copies locally the jar file. see PigServer.registerJar()

   
1494
        Path pathInHDFS = shipToHDFS(pigContext, conf, url);

   
1495
        // and add to the DistributedCache

   
1496
        DistributedCache.addFileToClassPath(pathInHDFS, conf);

   
1497
        pigContext.skipJars.add(url.getPath());

   
1498
    }

   
1499

    
   

   
1500
    /**

   
1501
     * copy the file to hdfs in a temporary path

   
1502
     * @param pigContext the pig context

   
1503
     * @param conf the job conf

   
1504
     * @param url the url to ship to hdfs

   
1505
     * @return the location where it was shipped

   
1506
     * @throws IOException

   
1507
     */

   
1508
    private static Path shipToHDFS(

   
1509
            PigContext pigContext,

   
1510
            Configuration conf,

   
1511
            URL url) throws IOException {

   
1512

    
   

   
1513
        String path = url.getPath();

   
1514
        int slash = path.lastIndexOf("/");

   
1515
        String suffix = slash == -1 ? path : path.substring(slash+1);

   
1516

    
   

   
1517
        Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);

   
1518
        FileSystem fs = dst.getFileSystem(conf);

   
1519
        OutputStream os = fs.create(dst);

   
1520
        try {

   
1521
            IOUtils.copyBytes(url.openStream(), os, 4096, true);

   
1522
        } finally {

   
1523
            // IOUtils can not close both the input and the output properly in a finally

   
1524
            // as we can get an exception in between opening the stream and calling the method

   
1525
            os.close();

   
1526
        }

   
1527
        return dst;

   
1528
    }

   
1529

    
   

   
1530

    
   

   
1531
    private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
1475
    private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
1532

    
   
1476

   
1533
        private PigContext pigContext = null;
1477
        private PigContext pigContext = null;
1534

    
   
1478

   
1535
        private Configuration conf = null;
1479
        private Configuration conf = null;
[+20] [20] 163 lines
src/org/apache/pig/backend/hadoop/executionengine/tez/DagUtils.java
Revision abd71ad New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Revision e69de29 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
Revision f6a98d0 New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
Revision 213ef33 New Change
 
src/org/apache/pig/impl/util/Utils.java
Revision b5cffcf New Change
 
  1. src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java: Loading...
  2. src/org/apache/pig/backend/hadoop/executionengine/tez/DagUtils.java: Loading...
  3. src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java: Loading...
  4. src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java: Loading...
  5. src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java: Loading...
  6. src/org/apache/pig/impl/util/Utils.java: Loading...